/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.transaction.coordinator.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterMetricsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TxnLogBufferedWriter<T> {
    private static final Logger log = LoggerFactory.getLogger(TxnLogBufferedWriter.class);
    public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 3585;
    public static final int BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN = 2;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN = 2;
    private static final ManagedLedgerException BUFFERED_WRITER_CLOSED_EXCEPTION = new ManagedLedgerException.ManagedLedgerFencedException(new Exception("Transaction log buffered write has closed"));
    private static final AtomicReferenceFieldUpdater<TxnLogBufferedWriter, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(TxnLogBufferedWriter.class, State.class, "state");
    private final boolean batchEnabled;
    private final ManagedLedger managedLedger;
    private final Timer timer;
    private final Executor singleThreadExecutorForWrite;
    private final DataSerializer<T> dataSerializer;
    private Timeout timeout;
    private final int batchedWriteMaxRecords;
    private final int batchedWriteMaxSize;
    private final int batchedWriteMaxDelayInMillis;
    private final ArrayList<T> dataArray;
    private FlushContext flushContext;
    private long bytesSize;
    private volatile State state;
    private final BookKeeperBatchedWriteCallback bookKeeperBatchedWriteCallback = new BookKeeperBatchedWriteCallback();
    private final TxnLogBufferedWriterMetricsStats metrics;
    private final TimerTask timingFlushTask = timeout -> {
        if (timeout.isCancelled()) {
            return;
        }
        this.trigFlushByTimingTask();
    };

    public TxnLogBufferedWriter(ManagedLedger managedLedger, Executor executor, Timer timer, DataSerializer<T> dataSerializer, int batchedWriteMaxRecords, int batchedWriteMaxSize, int batchedWriteMaxDelayInMillis, boolean batchEnabled, TxnLogBufferedWriterMetricsStats metrics) {
        if (batchedWriteMaxRecords <= 1 && batchEnabled) {
            if (metrics != null) {
                log.warn("Transaction Log Buffered Writer with the metrics name beginning with {} has batching enabled yet the maximum batch size was configured to less than or equal to 1 record, hence due to performance reasons batching is disabled", (Object)metrics.getMetricsPrefix());
            } else {
                log.warn("Transaction Log Buffered Writer has batching enabled yet the maximum batch size was configured to less than or equal to 1 record, hence due to performance reasons batching is disabled");
            }
        }
        this.batchEnabled = batchEnabled && batchedWriteMaxRecords > 1;
        this.managedLedger = managedLedger;
        this.singleThreadExecutorForWrite = executor;
        this.dataSerializer = dataSerializer;
        this.batchedWriteMaxRecords = batchedWriteMaxRecords;
        this.batchedWriteMaxSize = batchedWriteMaxSize;
        this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
        this.flushContext = FlushContext.newInstance();
        this.dataArray = new ArrayList();
        STATE_UPDATER.set(this, State.OPEN);
        if (metrics == null) {
            throw new IllegalArgumentException("Build TxnLogBufferedWriter error: param metrics can not be null");
        }
        this.metrics = metrics;
        this.timer = timer;
        if (this.batchEnabled) {
            this.nextTimingTrigger();
        }
    }

    private void nextTimingTrigger() {
        try {
            if (this.state == State.CLOSED || this.state == State.CLOSING) {
                return;
            }
            this.timeout = this.timer.newTimeout(this.timingFlushTask, (long)this.batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            log.error("Start timing flush trigger failed. managedLedger: " + this.managedLedger.getName(), (Throwable)e);
        }
    }

    public void asyncAddData(T data, AddDataCallback callback, Object ctx) {
        if (!this.batchEnabled) {
            if (this.state == State.CLOSING || this.state == State.CLOSED) {
                callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
                return;
            }
            ByteBuf byteBuf = this.dataSerializer.serialize(data);
            this.managedLedger.asyncAddEntry(byteBuf, (AsyncCallbacks.AddEntryCallback)DisabledBatchCallback.INSTANCE, (Object)AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
            return;
        }
        CompletableFuture.runAsync(() -> this.internalAsyncAddData(data, callback, ctx), this.singleThreadExecutorForWrite).exceptionally(e -> {
            log.warn("Execute 'internalAsyncAddData' fail", e);
            return null;
        });
    }

    private void internalAsyncAddData(T data, AddDataCallback callback, Object ctx) {
        int dataLength;
        if (this.state == State.CLOSING || this.state == State.CLOSED) {
            callback.addFailed(BUFFERED_WRITER_CLOSED_EXCEPTION, ctx);
            return;
        }
        try {
            dataLength = this.dataSerializer.getSerializedSize(data);
        }
        catch (Exception e) {
            callback.addFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerInterceptException((Throwable)e), ctx);
            return;
        }
        if (dataLength >= this.batchedWriteMaxSize) {
            this.trigFlushByLargeSingleData();
            ByteBuf byteBuf = null;
            try {
                byteBuf = this.dataSerializer.serialize(data);
            }
            catch (Exception e) {
                callback.addFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerInterceptException((Throwable)e), ctx);
                return;
            }
            this.managedLedger.asyncAddEntry(byteBuf, (AsyncCallbacks.AddEntryCallback)DisabledBatchCallback.INSTANCE, (Object)AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis(), byteBuf));
            return;
        }
        try {
            this.flushContext.addCallback(callback, ctx);
        }
        catch (Exception e) {
            callback.addFailed((ManagedLedgerException)new ManagedLedgerException.ManagedLedgerInterceptException((Throwable)e), ctx);
            return;
        }
        this.dataArray.add(data);
        this.bytesSize += (long)dataLength;
        this.trigFlushIfReachMaxRecordsOrMaxSize();
    }

    private void trigFlushByTimingTask() {
        CompletableFuture.runAsync(() -> {
            if (this.flushContext.asyncAddArgsList.isEmpty()) {
                return;
            }
            this.metrics.triggerFlushByByMaxDelay(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get((int)0).addedTime);
            this.doFlush();
        }, this.singleThreadExecutorForWrite).whenComplete((ignore, e) -> {
            if (e != null) {
                log.warn("Execute 'trigFlushByTimingTask' fail", e);
            }
            this.nextTimingTrigger();
        });
    }

    private void trigFlushIfReachMaxRecordsOrMaxSize() {
        if (this.flushContext.asyncAddArgsList.size() >= this.batchedWriteMaxRecords) {
            this.metrics.triggerFlushByRecordsCount(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get((int)0).addedTime);
            this.doFlush();
            return;
        }
        if (this.bytesSize >= (long)this.batchedWriteMaxSize) {
            this.metrics.triggerFlushByBytesSize(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get((int)0).addedTime);
            this.doFlush();
        }
    }

    private void trigFlushByLargeSingleData() {
        if (this.flushContext.asyncAddArgsList.isEmpty()) {
            return;
        }
        this.metrics.triggerFlushByLargeSingleData(this.flushContext.asyncAddArgsList.size(), this.bytesSize, System.currentTimeMillis() - this.flushContext.asyncAddArgsList.get((int)0).addedTime);
        this.doFlush();
    }

    private void doFlush() {
        ByteBuf wholeByteBuf;
        ByteBuf prefixByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(4);
        prefixByteBuf.writeShort(3585);
        prefixByteBuf.writeShort(1);
        ByteBuf contentByteBuf = this.dataSerializer.serialize(this.dataArray);
        this.flushContext.byteBuf = wholeByteBuf = Unpooled.wrappedUnmodifiableBuffer((ByteBuf[])new ByteBuf[]{prefixByteBuf, contentByteBuf});
        if (State.CLOSING == this.state || State.CLOSED == this.state) {
            this.failureCallbackByContextAndRecycle(this.flushContext, BUFFERED_WRITER_CLOSED_EXCEPTION);
        } else {
            this.managedLedger.asyncAddEntry(wholeByteBuf, (AsyncCallbacks.AddEntryCallback)this.bookKeeperBatchedWriteCallback, (Object)this.flushContext);
        }
        this.dataArray.clear();
        this.flushContext = FlushContext.newInstance();
        this.bytesSize = 0L;
    }

    public CompletableFuture<Void> close() {
        if (!this.batchEnabled) {
            STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
            return CompletableFuture.completedFuture(null);
        }
        if (!STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSING)) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        FutureUtil.safeRunAsync(() -> {
            this.failureCallbackByContextAndRecycle(this.flushContext, (ManagedLedgerException)new ManagedLedgerException.ManagedLedgerFencedException(new Exception("Transaction log buffered write has closed")));
            if (!this.timeout.isCancelled()) {
                this.timeout.cancel();
            }
            STATE_UPDATER.set(this, State.CLOSED);
            closeFuture.complete(null);
        }, (Executor)this.singleThreadExecutorForWrite, closeFuture);
        return closeFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failureCallbackByContextAndRecycle(FlushContext flushContext, ManagedLedgerException ex) {
        if (flushContext == null) {
            return;
        }
        try {
            if (flushContext.asyncAddArgsList != null) {
                for (AsyncAddArgs asyncAddArgs : flushContext.asyncAddArgsList) {
                    this.failureCallbackByArgs(asyncAddArgs, ex, false);
                }
            }
        }
        finally {
            flushContext.recycle();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failureCallbackByArgs(AsyncAddArgs asyncAddArgs, ManagedLedgerException ex, boolean recycle) {
        if (asyncAddArgs == null) {
            return;
        }
        try {
            asyncAddArgs.callback.addFailed(ex, asyncAddArgs.ctx);
        }
        catch (Exception e) {
            log.error("After writing to the transaction batched log failure, the callback executed also failed. managedLedger: " + this.managedLedger.getName(), (Throwable)e);
        }
        finally {
            if (recycle) {
                asyncAddArgs.recycle();
            }
        }
    }

    public TxnLogBufferedWriterMetricsStats getMetrics() {
        return this.metrics;
    }

    private class BookKeeperBatchedWriteCallback
    implements AsyncCallbacks.AddEntryCallback {
        private BookKeeperBatchedWriteCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
            FlushContext flushContext = (FlushContext)ctx;
            try {
                int batchSize = flushContext.asyncAddArgsList.size();
                for (int batchIndex = 0; batchIndex < batchSize; ++batchIndex) {
                    AsyncAddArgs asyncAddArgs = flushContext.asyncAddArgsList.get(batchIndex);
                    TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(position, batchSize, batchIndex);
                    try {
                        asyncAddArgs.callback.addComplete((Position)txnBatchedPosition, asyncAddArgs.ctx);
                        continue;
                    }
                    catch (Exception e) {
                        log.error("After writing to the transaction batched log complete, the callback failed. managedLedger: " + TxnLogBufferedWriter.this.managedLedger.getName(), (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                log.error("Handle callback fail after ML write complete", (Throwable)e);
            }
            finally {
                flushContext.recycle();
            }
        }

        public void addFailed(ManagedLedgerException exception, Object ctx) {
            try {
                FlushContext flushContext = (FlushContext)ctx;
                TxnLogBufferedWriter.this.failureCallbackByContextAndRecycle(flushContext, exception);
            }
            catch (Exception e) {
                log.error("Handle callback fail after ML write fail", (Throwable)e);
            }
        }
    }

    public static interface DataSerializer<T> {
        public int getSerializedSize(T var1);

        public ByteBuf serialize(T var1);

        public ByteBuf serialize(ArrayList<T> var1);
    }

    private static class FlushContext {
        private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER = new Recycler<FlushContext>(){

            protected FlushContext newObject(Recycler.Handle<FlushContext> handle) {
                return new FlushContext(handle);
            }
        };
        private final Recycler.Handle<FlushContext> handle;
        private final ArrayList<AsyncAddArgs> asyncAddArgsList;
        private ByteBuf byteBuf;

        private FlushContext(Recycler.Handle<FlushContext> handle) {
            this.handle = handle;
            this.asyncAddArgsList = new ArrayList(8);
        }

        private static FlushContext newInstance() {
            return (FlushContext)FLUSH_CONTEXT_RECYCLER.get();
        }

        public void recycle() {
            for (AsyncAddArgs asyncAddArgs : this.asyncAddArgsList) {
                asyncAddArgs.recycle();
            }
            if (this.byteBuf != null) {
                this.byteBuf.release();
                this.byteBuf = null;
            }
            this.asyncAddArgsList.clear();
            this.handle.recycle((Object)this);
        }

        public void addCallback(AddDataCallback callback, Object ctx) {
            AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, System.currentTimeMillis());
            this.asyncAddArgsList.add(asyncAddArgs);
        }
    }

    private static enum State {
        OPEN,
        CLOSING,
        CLOSED;

    }

    public static interface AddDataCallback {
        public void addComplete(Position var1, Object var2);

        public void addFailed(ManagedLedgerException var1, Object var2);
    }

    private static class DisabledBatchCallback
    implements AsyncCallbacks.AddEntryCallback {
        private static final DisabledBatchCallback INSTANCE = new DisabledBatchCallback();

        private DisabledBatchCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs)ctx;
            try {
                asyncAddArgs.callback.addComplete(position, asyncAddArgs.ctx);
            }
            finally {
                asyncAddArgs.recycle();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addFailed(ManagedLedgerException exception, Object ctx) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs)ctx;
            try {
                asyncAddArgs.callback.addFailed(exception, asyncAddArgs.ctx);
            }
            finally {
                asyncAddArgs.recycle();
            }
        }
    }

    private static class AsyncAddArgs {
        private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER = new Recycler<AsyncAddArgs>(){

            protected AsyncAddArgs newObject(Recycler.Handle<AsyncAddArgs> handle) {
                return new AsyncAddArgs(handle);
            }
        };
        private final Recycler.Handle<AsyncAddArgs> handle;
        private AddDataCallback callback;
        private Object ctx;
        private long addedTime;
        private ByteBuf byteBuf;

        private static AsyncAddArgs newInstance(AddDataCallback callback, Object ctx, long addedTime) {
            AsyncAddArgs asyncAddArgs = (AsyncAddArgs)ASYNC_ADD_ARGS_RECYCLER.get();
            asyncAddArgs.callback = callback;
            asyncAddArgs.ctx = ctx;
            asyncAddArgs.addedTime = addedTime;
            return asyncAddArgs;
        }

        private static AsyncAddArgs newInstance(AddDataCallback callback, Object ctx, long addedTime, ByteBuf byteBuf) {
            AsyncAddArgs asyncAddArgs = AsyncAddArgs.newInstance(callback, ctx, addedTime);
            asyncAddArgs.byteBuf = byteBuf;
            return asyncAddArgs;
        }

        private AsyncAddArgs(Recycler.Handle<AsyncAddArgs> handle) {
            this.handle = handle;
        }

        public void recycle() {
            this.callback = null;
            this.ctx = null;
            this.addedTime = 0L;
            if (this.byteBuf != null) {
                this.byteBuf.release();
                this.byteBuf = null;
            }
            this.handle.recycle((Object)this);
        }

        public String toString() {
            return "TxnLogBufferedWriter.AsyncAddArgs(handle=" + this.handle + ", callback=" + this.getCallback() + ", ctx=" + this.getCtx() + ", addedTime=" + this.getAddedTime() + ", byteBuf=" + this.byteBuf + ")";
        }

        public AddDataCallback getCallback() {
            return this.callback;
        }

        public Object getCtx() {
            return this.ctx;
        }

        public long getAddedTime() {
            return this.addedTime;
        }
    }
}

