package io.zeebe.logstreams.impl;

import io.zeebe.logstreams.impl.LogStreamImpl;
import io.zeebe.logstreams.impl.log.index.LogBlockIndex;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.logstreams.spi.ReadableSnapshot;
import io.zeebe.logstreams.spi.SnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.allocation.AllocatedBuffer;
import io.zeebe.util.allocation.BufferAllocators;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.TransitionState;
import io.zeebe.util.state.WaitState;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.Position;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController.class */
public class LogBlockIndexController implements Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    public static final float DEFAULT_DEVIATION = 0.1f;
    protected static final int TRANSITION_SNAPSHOT = 3;
    protected static final int TRANSITION_TRUNCATE = 4;
    protected static final int TRANSITION_CREATE = 5;
    protected final OpeningState openingState;
    protected final OpenState openState;
    protected final SnapshottingState snapshottingState;
    protected final ClosedState closedState;
    protected final TruncateState truncateState;
    protected final BlockIndexCreationState blockIndexCreationState;
    protected final LogStateMachineAgent stateMachine;
    protected final String name;
    protected final LogStorage logStorage;
    protected final LogBlockIndex blockIndex;
    protected final ActorScheduler actorScheduler;
    protected ActorReference actorRef;
    protected final int indexBlockSize;
    protected final float deviation;
    protected final SnapshotStorage snapshotStorage;
    protected final SnapshotPolicy snapshotPolicy;
    protected final UnsafeBuffer buffer;
    protected final CompleteEventsInBlockProcessor readResultProcessor;
    protected final Runnable openStateRunnable;
    protected final Runnable closedStateRunnable;
    protected long nextAddress;
    protected int bufferSize;
    protected ByteBuffer ioBuffer;
    protected AllocatedBuffer allocatedBuffer;
    protected CompletableFuture<Void> truncateFuture;
    protected Position commitPosition;

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$BlockIndexCreationState.class */
    private class BlockIndexCreationState implements State<LogContext> {
        private BlockIndexCreationState() {
        }

        public int doWork(LogContext logContext) throws Exception {
            int i = 0;
            if (LogBlockIndexController.this.readResultProcessor.getLastReadEventPosition() <= LogBlockIndexController.this.getCommitPosition()) {
                createBlockIdx(logContext, logContext.getCurrentBlockAddress());
                LogBlockIndexController.this.ioBuffer.clear();
                logContext.resetCurrentBlockAddress();
                i = 1;
            }
            return i;
        }

        private void createBlockIdx(LogContext logContext, long j) {
            long firstEventPosition = logContext.getFirstEventPosition();
            LogBlockIndexController.this.blockIndex.addBlock(firstEventPosition, j);
            if (LogBlockIndexController.this.snapshotPolicy.apply(firstEventPosition)) {
                logContext.take(LogBlockIndexController.TRANSITION_SNAPSHOT);
            } else {
                logContext.resetLastPosition();
                logContext.take(0);
            }
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$ClosedState.class */
    protected class ClosedState implements WaitState<LogContext> {
        protected ClosedState() {
        }

        public void work(LogContext logContext) throws Exception {
            LogBlockIndexController.this.getStateMachine().closing();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$OpenState.class */
    public class OpenState implements State<LogContext> {
        private int currentBlockSize = 0;

        protected OpenState() {
        }

        public int doWork(LogContext logContext) {
            int i = 0;
            if (LogBlockIndexController.this.nextAddress != -1) {
                long j = LogBlockIndexController.this.nextAddress;
                long read = LogBlockIndexController.this.logStorage.read(LogBlockIndexController.this.ioBuffer, j, LogBlockIndexController.this.readResultProcessor);
                if (read > j) {
                    tryToCreateBlockIndex(logContext, j);
                    LogBlockIndexController.this.nextAddress = read;
                    i = 1;
                } else if (read == -1) {
                    LogBlockIndexController.LOG.error("Can't read from illegal address: {}", Long.valueOf(j));
                } else if (read == -3) {
                    increaseBufferSize();
                    i = 1;
                }
            } else {
                LogBlockIndexController.this.nextAddress = resolveLastValidAddress();
            }
            return i;
        }

        private long resolveLastValidAddress() {
            long j = 0;
            if (LogBlockIndexController.this.blockIndex.size() > 0) {
                j = LogBlockIndexController.this.blockIndex.getAddress(LogBlockIndexController.this.blockIndex.size());
            }
            if (j <= 0) {
                j = LogBlockIndexController.this.logStorage.getFirstBlockAddress();
            }
            return j;
        }

        private void tryToCreateBlockIndex(LogContext logContext, long j) {
            if (!logContext.hasCurrentBlockAddress()) {
                logContext.setCurrentBlockAddress(j);
                logContext.setFirstEventPosition(LogEntryDescriptor.getPosition(LogBlockIndexController.this.buffer, 0));
            }
            this.currentBlockSize += LogBlockIndexController.this.ioBuffer.position();
            if (this.currentBlockSize < LogBlockIndexController.this.indexBlockSize) {
                LogBlockIndexController.this.ioBuffer.clear();
            } else {
                logContext.take(LogBlockIndexController.TRANSITION_CREATE);
                this.currentBlockSize = 0;
            }
        }

        private void increaseBufferSize() {
            LogBlockIndexController.this.bufferSize *= 2;
            LogBlockIndexController.this.allocatedBuffer.close();
            LogBlockIndexController.this.allocatedBuffer = BufferAllocators.allocateDirect(LogBlockIndexController.this.bufferSize);
            LogBlockIndexController.this.ioBuffer = LogBlockIndexController.this.allocatedBuffer.getRawBuffer();
            LogBlockIndexController.this.buffer.wrap(LogBlockIndexController.this.ioBuffer);
        }

        public void reset(LogContext logContext) {
            this.currentBlockSize = 0;
            LogBlockIndexController.this.nextAddress = logContext.getCurrentBlockAddress();
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$OpeningState.class */
    protected class OpeningState implements TransitionState<LogContext> {
        protected OpeningState() {
        }

        public void work(LogContext logContext) {
            try {
                LogBlockIndexController.this.allocatedBuffer = BufferAllocators.allocateDirect(LogBlockIndexController.this.bufferSize);
                LogBlockIndexController.this.ioBuffer = LogBlockIndexController.this.allocatedBuffer.getRawBuffer();
                LogBlockIndexController.this.buffer.wrap(LogBlockIndexController.this.ioBuffer);
                if (!LogBlockIndexController.this.logStorage.isOpen()) {
                    LogBlockIndexController.this.logStorage.open();
                }
                recoverBlockIndex();
            } catch (Exception e) {
                LogBlockIndexController.this.nextAddress = LogBlockIndexController.this.logStorage.getFirstBlockAddress();
            } finally {
                logContext.take(0);
                LogBlockIndexController.this.stateMachine.completeOpenFuture(null);
            }
        }

        protected void recoverBlockIndex() throws Exception {
            long firstBlockAddress = LogBlockIndexController.this.logStorage.getFirstBlockAddress();
            ReadableSnapshot lastSnapshot = LogBlockIndexController.this.snapshotStorage.getLastSnapshot(LogBlockIndexController.this.name);
            if (lastSnapshot == null) {
                LogBlockIndexController.this.nextAddress = firstBlockAddress;
                return;
            }
            lastSnapshot.recoverFromSnapshot(LogBlockIndexController.this.blockIndex);
            LogBlockIndexController.this.nextAddress = Math.max(LogBlockIndexController.this.blockIndex.lookupBlockAddress(lastSnapshot.getPosition()), firstBlockAddress);
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$SnapshottingState.class */
    protected class SnapshottingState implements TransitionState<LogContext> {
        protected SnapshottingState() {
        }

        public void work(LogContext logContext) {
            SnapshotWriter snapshotWriter = null;
            try {
                try {
                    LogBlockIndexController.this.logStorage.flush();
                    snapshotWriter = LogBlockIndexController.this.snapshotStorage.createSnapshot(LogBlockIndexController.this.name, logContext.getFirstEventPosition());
                    snapshotWriter.writeSnapshot(LogBlockIndexController.this.blockIndex);
                    snapshotWriter.commit();
                    logContext.setFirstEventPosition(0L);
                    logContext.take(0);
                } catch (Exception e) {
                    LogBlockIndexController.LOG.error("Failed to create snapshot", e);
                    if (snapshotWriter != null) {
                        snapshotWriter.abort();
                    }
                    logContext.setFirstEventPosition(0L);
                    logContext.take(0);
                }
            } catch (Throwable th) {
                logContext.setFirstEventPosition(0L);
                logContext.take(0);
                throw th;
            }
        }
    }

    /* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexController$TruncateState.class */
    private class TruncateState implements State<LogContext> {
        private TruncateState() {
        }

        public int doWork(LogContext logContext) throws Exception {
            try {
                LogBlockIndexController.this.openState.reset(logContext);
                logContext.reset();
                return 0;
            } finally {
                LogBlockIndexController.this.truncateFuture.complete(null);
                LogBlockIndexController.this.truncateFuture = null;
                logContext.take(0);
            }
        }
    }

    public LogBlockIndexController(LogStreamImpl.LogStreamBuilder logStreamBuilder) {
        this(logStreamBuilder, null);
    }

    public LogBlockIndexController(LogStreamImpl.LogStreamBuilder logStreamBuilder, Position position) {
        this.openingState = new OpeningState();
        this.openState = new OpenState();
        this.snapshottingState = new SnapshottingState();
        this.closedState = new ClosedState();
        this.truncateState = new TruncateState();
        this.blockIndexCreationState = new BlockIndexCreationState();
        this.buffer = new UnsafeBuffer(0L, 0);
        this.readResultProcessor = new CompleteEventsInBlockProcessor();
        this.nextAddress = -1L;
        this.name = logStreamBuilder.getLogName();
        this.logStorage = logStreamBuilder.getLogStorage();
        this.blockIndex = logStreamBuilder.getBlockIndex();
        this.actorScheduler = logStreamBuilder.getActorScheduler();
        this.commitPosition = position;
        this.deviation = logStreamBuilder.getDeviation();
        this.indexBlockSize = (int) (logStreamBuilder.getIndexBlockSize() * (1.0f - this.deviation));
        this.snapshotStorage = logStreamBuilder.getSnapshotStorage();
        this.snapshotPolicy = logStreamBuilder.getSnapshotPolicy();
        this.bufferSize = logStreamBuilder.getReadBlockSize();
        this.openStateRunnable = () -> {
            this.actorRef = this.actorScheduler.schedule(this);
        };
        this.closedStateRunnable = () -> {
            this.allocatedBuffer.close();
            this.actorRef.close();
        };
        this.stateMachine = new LogStateMachineAgent(StateMachine.builder(stateMachine -> {
            return new LogContext(stateMachine);
        }).initialState(this.closedState).from(this.openingState).take(0).to(this.openState).from(this.openState).take(TRANSITION_TRUNCATE).to(this.truncateState).from(this.truncateState).take(0).to(this.openState).from(this.openState).take(2).to(this.closedState).from(this.closedState).take(1).to(this.openingState).from(this.openState).take(TRANSITION_CREATE).to(this.blockIndexCreationState).from(this.blockIndexCreationState).take(TRANSITION_SNAPSHOT).to(this.snapshottingState).from(this.snapshottingState).take(0).to(this.openState).from(this.blockIndexCreationState).take(TRANSITION_TRUNCATE).to(this.truncateState).from(this.blockIndexCreationState).take(2).to(this.closedState).from(this.blockIndexCreationState).take(0).to(this.openState).build(), this.openStateRunnable, this.closedStateRunnable);
    }

    public int doWork() {
        return getStateMachine().doWork();
    }

    public String name() {
        return this.name;
    }

    protected LogStateMachineAgent getStateMachine() {
        return this.stateMachine;
    }

    public boolean isClosed() {
        return getStateMachine().getCurrentState() == this.closedState;
    }

    public boolean isOpen() {
        return getStateMachine().getCurrentState() == this.openState;
    }

    public boolean isRunning() {
        return getStateMachine().isRunning();
    }

    public boolean isInCreateState() {
        return getStateMachine().getCurrentState() == this.blockIndexCreationState;
    }

    public void open() {
        getStateMachine().open();
    }

    public CompletableFuture<Void> openAsync() {
        return getStateMachine().openAsync();
    }

    public void close() {
        getStateMachine().close();
    }

    public CompletableFuture<Void> closeAsync() {
        return getStateMachine().closeAsync();
    }

    public long getNextAddress() {
        return this.nextAddress;
    }

    public int getIndexBlockSize() {
        return this.indexBlockSize;
    }

    public long getCommitPosition() {
        if (this.commitPosition == null) {
            return -1L;
        }
        return this.commitPosition.get();
    }

    public CompletableFuture<Void> truncate() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        getStateMachine().addCommand(logContext -> {
            if (logContext.tryTake(TRANSITION_TRUNCATE)) {
                this.truncateFuture = completableFuture;
            } else {
                completableFuture.completeExceptionally(new IllegalStateException("Cannot truncate log stream. State is neither open nor create."));
            }
        });
        return completableFuture;
    }
}
