package org.mongoflink.sink;

import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.bson.Document;
import org.mongoflink.config.SinkConfiguration;
import org.mongoflink.internal.connection.MongoClientProvider;
import org.mongoflink.serde.DocumentSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mongoflink/sink/MongoBulkWriter.class */
public class MongoBulkWriter<IN> implements SinkWriter<IN, DocumentBulk, DocumentBulk> {
    private final MongoClientProvider collectionProvider;
    private transient MongoCollection<Document> collection;
    private DocumentSerializer<IN> serializer;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture scheduledFuture;
    private volatile transient Exception flushException;
    private final long maxSize;
    private final boolean flushOnCheckpoint;
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoBulkWriter.class);
    private final ConcurrentLinkedQueue<Document> currentBulk = new ConcurrentLinkedQueue<>();
    private final List<DocumentBulk> pendingBulks = new ArrayList();
    private final MongoBulkWriter<IN>.RetryPolicy retryPolicy = new RetryPolicy(3, 1000);
    private volatile transient boolean initialized = false;
    private volatile transient boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    @NotThreadSafe
    /* loaded from: input_file:org/mongoflink/sink/MongoBulkWriter$RetryPolicy.class */
    public class RetryPolicy {
        private final long maxRetries;
        private final long backoffMillis;
        private long currentRetries = 0;

        RetryPolicy(long j, long j2) {
            this.maxRetries = j;
            this.backoffMillis = j2;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: org.mongoflink.sink.MongoBulkWriter.RetryPolicy.shouldBackoffRetry():boolean
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        boolean shouldBackoffRetry() {
            /*
                r6 = this;
                r0 = r6
                r1 = r0
                long r1 = r1.currentRetries
                r2 = 1
                long r1 = r1 + r2
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.currentRetries = r1
                r0 = r6
                long r0 = r0.maxRetries
                int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
                if (r-1 <= 0) goto L15
                r-1 = 0
                return r-1
                r-1 = r6
                r-1.backoff()
                r-1 = 1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.mongoflink.sink.MongoBulkWriter.RetryPolicy.shouldBackoffRetry():boolean");
        }

        private void backoff() {
            try {
                Thread.sleep(this.backoffMillis);
            } catch (InterruptedException e) {
            }
        }

        void reset() {
            this.currentRetries = 0L;
        }
    }

    public MongoBulkWriter(MongoClientProvider mongoClientProvider, DocumentSerializer<IN> documentSerializer, SinkConfiguration sinkConfiguration) {
        this.collectionProvider = mongoClientProvider;
        this.serializer = documentSerializer;
        this.maxSize = sinkConfiguration.getBulkFlushSize();
        this.flushOnCheckpoint = sinkConfiguration.isFlushOnCheckpoint();
        if (this.flushOnCheckpoint || sinkConfiguration.getBulkFlushInterval() <= 0) {
            return;
        }
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongodb-bulk-writer"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (this.initialized && !this.closed) {
                    try {
                        rollBulkIfNeeded(true);
                        flush();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, sinkConfiguration.getBulkFlushInterval(), sinkConfiguration.getBulkFlushInterval(), TimeUnit.MILLISECONDS);
    }

    public void initializeState(List<DocumentBulk> list) {
        this.collection = this.collectionProvider.getDefaultCollection();
        Iterator<DocumentBulk> it = list.iterator();
        while (it.hasNext()) {
            Iterator<Document> it2 = it.next().getDocuments().iterator();
            while (it2.hasNext()) {
                this.currentBulk.add(it2.next());
                rollBulkIfNeeded();
            }
        }
        this.initialized = true;
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        checkFlushException();
        this.currentBulk.add(this.serializer.serialize(in));
        rollBulkIfNeeded();
    }

    public List<DocumentBulk> prepareCommit(boolean z) throws IOException {
        if (this.flushOnCheckpoint || z) {
            rollBulkIfNeeded(true);
        }
        return this.pendingBulks;
    }

    public List<DocumentBulk> snapshotState() throws IOException {
        ArrayList arrayList = new ArrayList(1);
        synchronized (this) {
            DocumentBulk documentBulk = new DocumentBulk();
            Iterator<Document> it = this.currentBulk.iterator();
            while (it.hasNext()) {
                documentBulk.add(it.next());
            }
            arrayList.add(documentBulk);
            arrayList.addAll(this.pendingBulks);
            this.pendingBulks.clear();
        }
        return arrayList;
    }

    public void close() throws Exception {
        if (!this.flushOnCheckpoint) {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        rollBulkIfNeeded(true);
                        flush();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }
        this.closed = true;
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
        }
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        if (this.collectionProvider != null) {
            this.collectionProvider.close();
        }
    }

    private synchronized void flush() {
        if (this.closed) {
            return;
        }
        ensureConnection();
        this.retryPolicy.reset();
        Iterator<DocumentBulk> it = this.pendingBulks.iterator();
        while (it.hasNext()) {
            DocumentBulk next = it.next();
            while (true) {
                try {
                    if (next.size() > 0) {
                        this.collection.insertMany(next.getDocuments());
                    }
                    it.remove();
                } catch (MongoException e) {
                    LOGGER.error("Failed to flush data to MongoDB", e);
                    if (!this.closed && this.retryPolicy.shouldBackoffRetry()) {
                    }
                }
            }
        }
    }

    private void ensureConnection() {
        try {
            this.collection.listIndexes();
        } catch (MongoException e) {
            LOGGER.warn("Connection is not available, try to reconnect", e);
            this.collectionProvider.recreateClient();
        }
    }

    private void rollBulkIfNeeded() {
        rollBulkIfNeeded(false);
    }

    private synchronized void rollBulkIfNeeded(boolean z) {
        int size = this.currentBulk.size();
        if (z || size >= this.maxSize) {
            DocumentBulk documentBulk = new DocumentBulk(this.maxSize);
            for (int i = 0; i < size; i++) {
                if (documentBulk.size() >= this.maxSize) {
                    this.pendingBulks.add(documentBulk);
                    documentBulk = new DocumentBulk(this.maxSize);
                }
                documentBulk.add(this.currentBulk.poll());
            }
            this.pendingBulks.add(documentBulk);
        }
    }

    private void checkFlushException() throws IOException {
        if (this.flushException != null) {
            throw new IOException("Failed to flush records to MongoDB", this.flushException);
        }
    }
}
