package io.camunda.zeebe.broker.system.partitions.impl;

import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/AsyncSnapshotDirector.class */
public final class AsyncSnapshotDirector extends Actor implements RaftCommittedEntryListener, HealthMonitorable {
    public static final Duration MINIMUM_SNAPSHOT_PERIOD = Duration.ofMinutes(1);
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private static final String LOG_MSG_WAIT_UNTIL_COMMITTED = "Finished taking temporary snapshot, need to wait until last written event position {} is committed, current commit position is {}. After that snapshot will be committed.";
    private static final String ERROR_MSG_ON_RESOLVE_PROCESSED_POS = "Unexpected error in resolving last processed position.";
    private static final String ERROR_MSG_ON_RESOLVE_WRITTEN_POS = "Unexpected error in resolving last written position.";
    private static final String ERROR_MSG_MOVE_SNAPSHOT = "Unexpected exception occurred on moving valid snapshot.";
    private final StateController stateController;
    private final Duration snapshotRate;
    private final String processorName;
    private final StreamProcessor streamProcessor;
    private final String actorName;
    private final StreamProcessorMode streamProcessorMode;
    private final BooleanSupplier isLastWrittenPositionCommitted;
    private Long lastWrittenPosition;
    private TransientSnapshot pendingSnapshot;
    private long lowerBoundSnapshotPosition;
    private CompletableActorFuture<PersistedSnapshot> snapshotFuture;
    private boolean persistingSnapshot;
    private long commitPosition;
    private final int partitionId;
    private final Set<FailureListener> listeners = new HashSet();
    private volatile HealthReport healthReport = HealthReport.healthy(this);

    private AsyncSnapshotDirector(int i, int i2, StreamProcessor streamProcessor, StateController stateController, Duration duration, StreamProcessorMode streamProcessorMode) {
        this.streamProcessor = streamProcessor;
        this.stateController = stateController;
        this.processorName = streamProcessor.getName();
        this.snapshotRate = duration;
        this.partitionId = i2;
        this.actorName = buildActorName(i, "SnapshotDirector", this.partitionId);
        this.streamProcessorMode = streamProcessorMode;
        if (streamProcessorMode == StreamProcessorMode.REPLAY) {
            this.isLastWrittenPositionCommitted = () -> {
                return true;
            };
        } else {
            this.isLastWrittenPositionCommitted = () -> {
                return this.lastWrittenPosition.longValue() <= this.commitPosition;
            };
        }
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.runDelayed(RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, this.snapshotRate), this::scheduleSnapshotOnRate);
        this.lastWrittenPosition = null;
    }

    public ActorFuture<Void> closeAsync() {
        return this.actor.isClosed() ? CompletableActorFuture.completed((Object) null) : super.closeAsync();
    }

    protected void handleFailure(Throwable th) {
        LOG.error("No snapshot was taken due to failure in '{}'. Will try to take snapshot after snapshot period {}. {}", new Object[]{this.actorName, this.snapshotRate, th});
        resetStateOnFailure(th);
        this.healthReport = HealthReport.unhealthy(this).withIssue(th);
        Iterator<FailureListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onFailure(this.healthReport);
        }
    }

    public static AsyncSnapshotDirector ofReplayMode(int i, int i2, StreamProcessor streamProcessor, StateController stateController, Duration duration) {
        return new AsyncSnapshotDirector(i, i2, streamProcessor, stateController, duration, StreamProcessorMode.REPLAY);
    }

    public static AsyncSnapshotDirector ofProcessingMode(int i, int i2, StreamProcessor streamProcessor, StateController stateController, Duration duration) {
        return new AsyncSnapshotDirector(i, i2, streamProcessor, stateController, duration, StreamProcessorMode.PROCESSING);
    }

    private void scheduleSnapshotOnRate() {
        this.actor.runAtFixedRate(this.snapshotRate, () -> {
            prepareTakingSnapshot(new CompletableActorFuture<>());
        });
        prepareTakingSnapshot(new CompletableActorFuture<>());
    }

    public CompletableActorFuture<PersistedSnapshot> forceSnapshot() {
        CompletableActorFuture<PersistedSnapshot> completableActorFuture = new CompletableActorFuture<>();
        this.actor.call(() -> {
            prepareTakingSnapshot(completableActorFuture);
        });
        return completableActorFuture;
    }

    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.remove(failureListener);
        });
    }

    private void prepareTakingSnapshot(CompletableActorFuture<PersistedSnapshot> completableActorFuture) {
        if (this.snapshotFuture != null) {
            LOG.debug("Already taking snapshot, skipping this request for a new snapshot");
            completableActorFuture.complete((Object) null);
        } else {
            this.snapshotFuture = completableActorFuture;
            this.actor.runOnCompletion(this.streamProcessor.getLastProcessedPositionAsync(), (l, th) -> {
                if (th != null) {
                    LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, th);
                    this.snapshotFuture.completeExceptionally(th);
                    this.snapshotFuture = null;
                } else if (l.longValue() != -1) {
                    this.lowerBoundSnapshotPosition = l.longValue();
                    takeSnapshot();
                } else {
                    LOG.debug("We will skip taking this snapshot, because we haven't processed something yet.");
                    this.snapshotFuture.complete((Object) null);
                    this.snapshotFuture = null;
                }
            });
        }
    }

    private void takeSnapshot() {
        this.stateController.takeTransientSnapshot(this.lowerBoundSnapshotPosition).onComplete((transientSnapshot, th) -> {
            if (th == null) {
                onTransientSnapshotTaken(transientSnapshot);
            } else {
                logSnapshotTakenError(th);
                resetStateOnFailure(th);
            }
        });
    }

    private void logSnapshotTakenError(Throwable th) {
        if (th instanceof SnapshotException.SnapshotAlreadyExistsException) {
            LOG.debug("Did not take a snapshot. {}", th.getMessage());
        } else if ((th instanceof NoEntryAtSnapshotPosition) && this.streamProcessorMode == StreamProcessorMode.REPLAY) {
            LOG.debug("Did not take a snapshot: {}. Most likely this partition has not received the entry yet. Will retry in {}", th.getMessage(), this.snapshotRate);
        } else {
            LOG.error("Failed to take a snapshot for {}", this.processorName, th);
        }
    }

    private void onTransientSnapshotTaken(TransientSnapshot transientSnapshot) {
        this.pendingSnapshot = transientSnapshot;
        onRecovered();
        this.actor.runOnCompletion(this.streamProcessor.getLastWrittenPositionAsync(), this::onLastWrittenPositionReceived);
    }

    private void onLastWrittenPositionReceived(Long l, Throwable th) {
        if (th != null) {
            resetStateOnFailure(th);
            LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, th);
        } else {
            LOG.info(LOG_MSG_WAIT_UNTIL_COMMITTED, l, Long.valueOf(this.commitPosition));
            this.lastWrittenPosition = l;
            this.persistingSnapshot = false;
            persistSnapshotIfLastWrittenPositionCommitted();
        }
    }

    private void onRecovered() {
        if (this.healthReport.isHealthy()) {
            return;
        }
        this.healthReport = HealthReport.healthy(this);
        this.listeners.forEach((v0) -> {
            v0.onRecovered();
        });
    }

    public void onCommit(IndexedRaftLogEntry indexedRaftLogEntry) {
        if (indexedRaftLogEntry.isApplicationEntry()) {
            newPositionCommitted(indexedRaftLogEntry.getApplicationEntry().highestPosition());
        }
    }

    public void newPositionCommitted(long j) {
        this.actor.run(() -> {
            this.commitPosition = j;
            persistSnapshotIfLastWrittenPositionCommitted();
        });
    }

    private void persistSnapshotIfLastWrittenPositionCommitted() {
        if (this.pendingSnapshot == null || this.lastWrittenPosition == null || !this.isLastWrittenPositionCommitted.getAsBoolean() || this.persistingSnapshot) {
            return;
        }
        this.persistingSnapshot = true;
        LOG.debug("Current commit position {} >= {}, committing snapshot {}.", new Object[]{Long.valueOf(this.commitPosition), this.lastWrittenPosition, this.pendingSnapshot});
        this.pendingSnapshot.withLastFollowupEventPosition(this.lastWrittenPosition.longValue()).persist().onComplete((persistedSnapshot, th) -> {
            if (th != null) {
                this.snapshotFuture.completeExceptionally(th);
                if (th instanceof SnapshotException.SnapshotNotFoundException) {
                    LOG.warn("Failed to persist transient snapshot {}. Nothing to worry if a newer snapshot exists.", this.pendingSnapshot, th);
                } else {
                    LOG.error(ERROR_MSG_MOVE_SNAPSHOT, th);
                }
            } else {
                this.snapshotFuture.complete(persistedSnapshot);
            }
            this.lastWrittenPosition = null;
            this.snapshotFuture = null;
            this.pendingSnapshot = null;
            this.persistingSnapshot = false;
        });
    }

    private void resetStateOnFailure(Throwable th) {
        if (this.snapshotFuture != null && !this.snapshotFuture.isDone()) {
            this.snapshotFuture.completeExceptionally(th);
        }
        this.lastWrittenPosition = null;
        this.snapshotFuture = null;
        if (this.pendingSnapshot != null) {
            this.pendingSnapshot.abort();
            this.pendingSnapshot = null;
        }
    }
}
