/*
 * Decompiled with CFR 0.152.
 */
package imported.vnext.org.apache.flink.connector.base.sink.sink.writer;

import imported.vnext.org.apache.flink.connector.base.sink.sink.writer.ElementConverter;
import imported.vnext.org.apache.flink.connector.base.sink.sink.writer.ForwardCompatibleMailboxExecutor;
import imported.vnext.org.apache.flink.connector.base.sink.sink.writer.RequestEntryWrapper;
import imported.vnext.org.apache.flink.connector.base.sink.sink.writer.SinkMetricGroup;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncSinkWriter.class);
    private final Sink.ProcessingTimeService timeService;
    private final ForwardCompatibleMailboxExecutor mailboxExecutor;
    private long lastSendTimestamp = 0L;
    private long ackTime = Long.MAX_VALUE;
    private long currentFlushTimestamp = 0L;
    private long prevFlushTimestamp = 0L;
    private final SinkMetricGroup metrics;
    private final int maxBatchSize;
    private final int maxInFlightRequests;
    private final int maxBufferedRequests;
    private final long maxBatchSizeInBytes;
    private final long maxTimeInBufferMS;
    private final long maxRecordSizeInBytes;
    private final ElementConverter<InputT, RequestEntryT> elementConverter;
    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries = new ArrayDeque<RequestEntryWrapper<RequestEntryT>>();
    private int inFlightRequestsCount;
    private double bufferedRequestEntriesTotalSizeInBytes;
    private boolean existsActiveTimerCallback = false;
    private final Consumer<Exception> fatalExceptionCons;

    protected abstract void submitRequestEntries(List<RequestEntryT> var1, Consumer<List<RequestEntryT>> var2);

    protected abstract long getSizeInBytes(RequestEntryT var1);

    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
        this.elementConverter = elementConverter;
        this.timeService = context.getProcessingTimeService();
        Preconditions.checkNotNull(elementConverter);
        Preconditions.checkArgument((maxBatchSize > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((maxBufferedRequests > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((maxInFlightRequests > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((maxBatchSizeInBytes > 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((maxTimeInBufferMS > 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((maxRecordSizeInBytes > 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((maxBufferedRequests > maxBatchSize ? 1 : 0) != 0, (Object)"The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
        Preconditions.checkArgument((maxBatchSizeInBytes >= maxRecordSizeInBytes ? 1 : 0) != 0, (Object)"The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
        this.maxBatchSize = maxBatchSize;
        this.maxInFlightRequests = maxInFlightRequests;
        this.maxBufferedRequests = maxBufferedRequests;
        this.maxBatchSizeInBytes = maxBatchSizeInBytes;
        this.maxTimeInBufferMS = maxTimeInBufferMS;
        this.maxRecordSizeInBytes = maxRecordSizeInBytes;
        this.inFlightRequestsCount = 0;
        this.bufferedRequestEntriesTotalSizeInBytes = 0.0;
        this.metrics = this.createSinkMetricGroup(context);
        this.metrics.setCurrentSendTimeGauge((Gauge<Long>)((Gauge)() -> this.ackTime - this.lastSendTimestamp));
        this.metrics.setInFlightRequestsGauge((Gauge<Long>)((Gauge)() -> this.inFlightRequestsCount));
        this.metrics.setBufferedRecordsGauge((Gauge<Long>)((Gauge)() -> this.bufferedRequestEntries.size()));
        this.metrics.setMillisTimeBetweenFlushesGauge((Gauge<Long>)((Gauge)() -> this.currentFlushTimestamp - this.prevFlushTimestamp));
        this.mailboxExecutor = new ForwardCompatibleMailboxExecutor();
        this.fatalExceptionCons = exception -> this.mailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
            throw exception;
        }), "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.", new Object[0]);
    }

    protected SinkMetricGroup createSinkMetricGroup(Sink.InitContext context) {
        return new SinkMetricGroup(context.metricGroup());
    }

    private void registerCallback() {
        Sink.ProcessingTimeService.ProcessingTimeCallback ptc = instant -> {
            this.existsActiveTimerCallback = false;
            while (!this.bufferedRequestEntries.isEmpty()) {
                this.flush();
            }
        };
        this.timeService.registerProcessingTimer(this.timeService.getCurrentProcessingTime() + this.maxTimeInBufferMS, ptc);
        this.existsActiveTimerCallback = true;
    }

    public void write(InputT element, SinkWriter.Context context) {
        while (this.bufferedRequestEntries.size() >= this.maxBufferedRequests) {
            this.mailboxExecutor.tryYield();
        }
        this.addEntryToBuffer((Serializable)this.elementConverter.apply(element, context), false);
        this.flushIfAble();
    }

    private void flushIfAble() {
        while (this.bufferedRequestEntries.size() >= this.maxBatchSize || this.bufferedRequestEntriesTotalSizeInBytes >= (double)this.maxBatchSizeInBytes) {
            this.flush();
        }
    }

    private void flush() {
        while (this.inFlightRequestsCount >= this.maxInFlightRequests) {
            this.mailboxExecutor.tryYield();
        }
        List<RequestEntryT> batch = this.createNextAvailableBatch();
        if (batch.size() == 0) {
            return;
        }
        long timestampOfRequest = System.currentTimeMillis();
        Consumer<List<RequestEntryT>> requestResult = failedRequestEntries -> this.mailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> this.completeRequest((List<RequestEntryT>)failedRequestEntries, timestampOfRequest)), "Mark in-flight request as completed and requeue %d request entries", failedRequestEntries.size());
        ++this.inFlightRequestsCount;
        this.prevFlushTimestamp = this.currentFlushTimestamp;
        this.currentFlushTimestamp = System.currentTimeMillis();
        this.submitRequestEntries(batch, requestResult);
    }

    private List<RequestEntryT> createNextAvailableBatch() {
        long requestEntrySize;
        int batchSize = Math.min(this.maxBatchSize, this.bufferedRequestEntries.size());
        ArrayList<Serializable> batch = new ArrayList<Serializable>(batchSize);
        int batchSizeBytes = 0;
        for (int i = 0; i < batchSize && (long)batchSizeBytes + (requestEntrySize = this.bufferedRequestEntries.peek().getSize()) <= this.maxBatchSizeInBytes; ++i) {
            RequestEntryWrapper<RequestEntryT> elem = this.bufferedRequestEntries.remove();
            batch.add((Serializable)elem.getRequestEntry());
            this.bufferedRequestEntriesTotalSizeInBytes -= (double)requestEntrySize;
            batchSizeBytes = (int)((long)batchSizeBytes + requestEntrySize);
        }
        this.metrics.getNumRecordsOutCounter().inc((long)batch.size());
        this.metrics.getNumBytesOutCounter().inc((long)batchSizeBytes);
        return batch;
    }

    private void completeRequest(List<RequestEntryT> failedRequestEntries, long requestStartTime) {
        this.lastSendTimestamp = requestStartTime;
        this.ackTime = System.currentTimeMillis();
        --this.inFlightRequestsCount;
        ListIterator<RequestEntryT> iterator = failedRequestEntries.listIterator(failedRequestEntries.size());
        while (iterator.hasPrevious()) {
            this.addEntryToBuffer((Serializable)iterator.previous(), true);
        }
    }

    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
        RequestEntryWrapper<RequestEntryT> wrappedEntry;
        if (this.bufferedRequestEntries.isEmpty() && !this.existsActiveTimerCallback) {
            this.registerCallback();
        }
        if ((wrappedEntry = new RequestEntryWrapper<RequestEntryT>(entry, this.getSizeInBytes(entry))).getSize() > this.maxRecordSizeInBytes) {
            throw new IllegalArgumentException(String.format("The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].", wrappedEntry.getSize(), this.maxRecordSizeInBytes));
        }
        if (insertAtHead) {
            this.bufferedRequestEntries.addFirst(wrappedEntry);
        } else {
            this.bufferedRequestEntries.add(wrappedEntry);
        }
        this.bufferedRequestEntriesTotalSizeInBytes += (double)wrappedEntry.getSize();
    }

    public List<Void> prepareCommit(boolean flush) {
        LOG.debug("prepareCommit invoked with flush: {}", (Object)flush);
        while (this.inFlightRequestsCount > 0 || this.bufferedRequestEntries.size() > 0 && flush) {
            this.mailboxExecutor.tryYield();
            if (!flush) continue;
            this.flush();
        }
        return Collections.emptyList();
    }

    public List<Collection<RequestEntryT>> snapshotState() {
        LOG.debug("snapshotState invoked");
        return Arrays.asList(this.bufferedRequestEntries.stream().map(RequestEntryWrapper::getRequestEntry).collect(Collectors.toList()));
    }

    public void close() {
    }

    protected Consumer<Exception> getFatalExceptionCons() {
        return this.fatalExceptionCons;
    }
}

