package org.datacleaner.util.batch;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.datacleaner.job.concurrent.PreviousErrorsExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/datacleaner/util/batch/BatchTransformationBuffer.class */
public class BatchTransformationBuffer<I, O> {
    public static final int DEFAULT_FLUSH_INTERVAL = 1000;
    public static final int DEFAULT_MAX_BATCH_SIZE = 20;
    private final BatchTransformation<I, O> _transformation;
    private final BlockingQueue<BatchEntry<I, O>> _queue;
    private final AtomicInteger _batchNo;
    private final int _maxBatchSize;
    private final ScheduledExecutorService _threadPool;
    private final int _flushInterval;
    private Throwable exception;
    private static final Logger logger = LoggerFactory.getLogger(BatchTransformationBuffer.class);
    private static final long[] AWAIT_TIMES = {20, 50, 100, 100, 200};

    public BatchTransformationBuffer(BatchTransformation<I, O> batchTransformation) {
        this(batchTransformation, 20, 1000);
    }

    public BatchTransformationBuffer(BatchTransformation<I, O> batchTransformation, int i, int i2) {
        this._transformation = batchTransformation;
        this._flushInterval = i2;
        this._maxBatchSize = i;
        this._queue = new ArrayBlockingQueue(i);
        this._batchNo = new AtomicInteger();
        this._threadPool = Executors.newScheduledThreadPool(1);
    }

    public void start() {
        logger.info("start()");
        this._threadPool.scheduleAtFixedRate(createFlushCommand(), this._flushInterval, this._flushInterval, TimeUnit.MILLISECONDS);
    }

    private Runnable createFlushCommand() {
        return new Runnable() { // from class: org.datacleaner.util.batch.BatchTransformationBuffer.1
            @Override // java.lang.Runnable
            public void run() {
                do {
                    try {
                        BatchTransformationBuffer.this.flushBuffer(true);
                    } catch (Throwable th) {
                        BatchTransformationBuffer.logger.warn("Cannot flush buffer", th);
                        BatchTransformationBuffer.this.exception = th;
                        BatchTransformationBuffer.this.shutdown();
                        return;
                    }
                } while (!BatchTransformationBuffer.this._queue.isEmpty());
            }
        };
    }

    public int getBatchCount() {
        return this._batchNo.get();
    }

    public void flushBuffer() {
        flushBuffer(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushBuffer(boolean z) {
        if (this._queue.isEmpty()) {
            return;
        }
        if (!z && this._queue.size() < this._maxBatchSize) {
            logger.debug("Batch ignored, flush operation not scheduled and queue is not full");
            return;
        }
        ArrayList arrayList = new ArrayList(this._maxBatchSize);
        int drainTo = this._queue.drainTo(arrayList);
        if (drainTo == 0) {
            logger.debug("Batch ignored, no elements left in queue");
            return;
        }
        int incrementAndGet = this._batchNo.incrementAndGet();
        logger.info("Batch #{} - Preparing {} entries, scheduled={}", new Object[]{Integer.valueOf(incrementAndGet), Integer.valueOf(drainTo), Boolean.valueOf(z)});
        Object[] objArr = new Object[drainTo];
        for (int i = 0; i < drainTo; i++) {
            objArr[i] = ((BatchEntry) arrayList.get(i)).getInput();
        }
        this._transformation.map(new ArrayBatchSource(objArr), new BatchEntryBatchSink(arrayList));
        logger.info("Batch #{} - Finished", Integer.valueOf(incrementAndGet), Integer.valueOf(drainTo));
    }

    public void shutdown() {
        logger.info("shutdown()");
        this._threadPool.shutdown();
    }

    public O transform(I i) {
        BatchEntry<I, O> batchEntry = new BatchEntry<>(i);
        while (!this._queue.offer(batchEntry)) {
            flushBuffer();
        }
        int i2 = 0;
        while (true) {
            rethrowException();
            if (this._threadPool.isShutdown()) {
                rethrowException();
                throw new PreviousErrorsExistException("Transformer closed");
            }
            try {
                if (batchEntry.await(i2 < AWAIT_TIMES.length ? AWAIT_TIMES[i2] : AWAIT_TIMES[AWAIT_TIMES.length - 1])) {
                    return batchEntry.getOuput();
                }
                flushBuffer();
                i2++;
            } catch (Exception e) {
                if (this.exception == null) {
                    this.exception = e;
                }
                if (e instanceof RuntimeException) {
                    throw ((RuntimeException) e);
                }
                throw new IllegalStateException(e);
            }
        }
    }

    private void rethrowException() {
        if (this.exception != null) {
            if (!(this.exception instanceof RuntimeException)) {
                throw new RuntimeException(this.exception);
            }
            throw ((RuntimeException) this.exception);
        }
    }
}
