package org.jesterj.ingest.processors;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.utils.ConcurrentBiMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.jesterj.ingest.model.Document;
import org.jesterj.ingest.model.DocumentProcessor;
import org.jesterj.ingest.model.Status;
import org.jesterj.ingest.model.impl.NamedBuilder;

/* loaded from: input_file:org/jesterj/ingest/processors/BatchProcessor.class */
abstract class BatchProcessor<T> implements DocumentProcessor {
    private static final Logger log = LogManager.getLogger();
    private volatile ScheduledExecutorService sender;
    private ScheduledFuture<?> scheduledSend;
    private ConcurrentBiMap<Document, T> batch;
    private int batchSize = 100;
    private int sendPartialBatchAfterMs = 5000;
    private final Object batchLock = new Object();
    private final Object sendLock = new Object();

    /* loaded from: input_file:org/jesterj/ingest/processors/BatchProcessor$Builder.class */
    public static abstract class Builder<T> extends NamedBuilder<BatchProcessor<T>> {
        public Builder<T> sendingBatchesOf(int i) {
            ((BatchProcessor) getObj()).batchSize = i;
            return this;
        }

        public Builder<T> sendingPartialBatchesAfterMs(int i) {
            ((BatchProcessor) getObj()).sendPartialBatchAfterMs = i;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchProcessor() {
        synchronized (this.batchLock) {
            this.batch = new ConcurrentBiMap<>();
        }
    }

    @Override // org.jesterj.ingest.model.DocumentProcessor
    public Document[] processDocument(Document document) {
        if (this.sender == null) {
            synchronized (this) {
                if (this.sender == null) {
                    this.sender = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.jesterj.ingest.processors.BatchProcessor.1
                        @Override // java.util.concurrent.ThreadFactory
                        public Thread newThread(Runnable runnable) {
                            return new Thread(runnable) { // from class: org.jesterj.ingest.processors.BatchProcessor.1.1
                                private final Map<String, String> context = ThreadContext.getContext();

                                @Override // java.lang.Thread, java.lang.Runnable
                                public void run() {
                                    ThreadContext.putAll(this.context);
                                    super.run();
                                }
                            };
                        }
                    });
                    schedulePartialBatch();
                }
            }
        }
        T convertDoc = convertDoc(document);
        ConcurrentBiMap<Document, T> concurrentBiMap = null;
        synchronized (this.batchLock) {
            if (this.batch.size() >= this.batchSize) {
                concurrentBiMap = takeBatch();
            }
            this.batch.put(document, convertDoc);
            document.setStatus(Status.BATCHED, "{} queued in position {} for sending to solr. Will be sent within {} milliseconds.", document.getId(), Integer.valueOf(this.batch.size()), Integer.valueOf(this.sendPartialBatchAfterMs));
            document.reportDocStatus();
        }
        if (concurrentBiMap != null) {
            sendBatch(concurrentBiMap);
        }
        log.trace("Batch Processor ({}) processed {}", getName(), document.getId());
        return new Document[0];
    }

    private ConcurrentBiMap<Document, T> takeBatch() {
        ConcurrentBiMap<Document, T> concurrentBiMap;
        synchronized (this.batchLock) {
            concurrentBiMap = this.batch;
            this.batch = new ConcurrentBiMap<>();
            log.trace("took batch {} with size {}", concurrentBiMap.toString(), Integer.valueOf(concurrentBiMap.size()));
        }
        return concurrentBiMap;
    }

    private void sendBatch(ConcurrentBiMap<Document, T> concurrentBiMap) {
        synchronized (this.sendLock) {
            try {
                try {
                } finally {
                    schedulePartialBatch();
                    concurrentBiMap.clear();
                }
            } catch (InterruptedException e) {
                log.info("Send aborted due to system shutdown");
                schedulePartialBatch();
                concurrentBiMap.clear();
            } catch (Exception e2) {
                log.info("Batch Send failed", e2);
                if (exceptionIndicatesDocumentIssue(e2)) {
                    individualFallbackOperation(concurrentBiMap, e2);
                } else {
                    perDocumentFailure(concurrentBiMap, e2);
                }
                schedulePartialBatch();
                concurrentBiMap.clear();
            }
            if (concurrentBiMap.size() == 0) {
                return;
            }
            batchOperation(concurrentBiMap);
            schedulePartialBatch();
            concurrentBiMap.clear();
        }
    }

    private void schedulePartialBatch() {
        log.trace("Scheduling partial batch");
        if (this.scheduledSend != null) {
            this.scheduledSend.cancel(false);
        }
        this.scheduledSend = this.sender.schedule(() -> {
            log.trace("Scheduled Send Activated");
            sendBatch(takeBatch());
        }, this.sendPartialBatchAfterMs, TimeUnit.MILLISECONDS);
    }

    protected void perDocumentFailure(ConcurrentBiMap<Document, ?> concurrentBiMap, Exception exc) {
        Iterator it = concurrentBiMap.keySet().iterator();
        while (it.hasNext()) {
            perDocFailLogging(exc, (Document) it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DocumentLoggingContext createDocContext(Document document) {
        return new DocumentLoggingContext(document);
    }

    protected abstract void perDocFailLogging(Exception exc, Document document);

    protected abstract void individualFallbackOperation(ConcurrentBiMap<Document, T> concurrentBiMap, Exception exc);

    protected abstract void batchOperation(ConcurrentBiMap<Document, T> concurrentBiMap) throws Exception;

    protected abstract boolean exceptionIndicatesDocumentIssue(Exception exc);

    protected abstract T convertDoc(Document document);
}
