package io.camunda.zeebe.broker.system.partitions.impl;

import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.stream.impl.StreamProcessor;
import io.camunda.zeebe.stream.impl.StreamProcessorMode;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/AsyncSnapshotDirector.class */
public final class AsyncSnapshotDirector extends Actor implements RaftCommittedEntryListener, HealthMonitorable {
    public static final Duration MINIMUM_SNAPSHOT_PERIOD = Duration.ofMinutes(1);
    private static final Logger LOG = Loggers.SNAPSHOT_LOGGER;
    private static final String LOG_MSG_WAIT_UNTIL_COMMITTED = "Finished taking temporary snapshot, need to wait until last written event position {} is committed, current commit position is {}. After that snapshot will be committed.";
    private static final String ERROR_MSG_ON_RESOLVE_PROCESSED_POS = "Unexpected error in resolving last processed position.";
    private static final String ERROR_MSG_ON_RESOLVE_WRITTEN_POS = "Unexpected error in resolving last written position.";
    private static final String ERROR_MSG_MOVE_SNAPSHOT = "Unexpected exception occurred on moving valid snapshot.";
    private final StateController stateController;
    private final Duration snapshotRate;
    private final String processorName;
    private final StreamProcessor streamProcessor;
    private final String actorName;
    private final StreamProcessorMode streamProcessorMode;
    private final Callable<CompletableFuture<Void>> flushLog;
    private final int partitionId;
    private CompletableActorFuture<PersistedSnapshot> ongoingSnapshotFuture;
    private long commitPosition;
    private final Set<FailureListener> listeners = new HashSet();
    private final TreeMap<Long, ActorFuture<Void>> commitAwaiters = new TreeMap<>();
    private volatile HealthReport healthReport = HealthReport.healthy(this);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/broker/system/partitions/impl/AsyncSnapshotDirector$InProgressSnapshot.class */
    public static class InProgressSnapshot {
        private long lastWrittenPosition;
        private TransientSnapshot pendingSnapshot;
        private long lowerBoundSnapshotPosition;

        private InProgressSnapshot() {
        }
    }

    private AsyncSnapshotDirector(int i, StreamProcessor streamProcessor, StateController stateController, Duration duration, StreamProcessorMode streamProcessorMode, Callable<CompletableFuture<Void>> callable) {
        this.streamProcessor = streamProcessor;
        this.stateController = stateController;
        this.processorName = streamProcessor.getName();
        this.snapshotRate = duration;
        this.partitionId = i;
        this.actorName = buildActorName("SnapshotDirector", this.partitionId);
        this.streamProcessorMode = streamProcessorMode;
        this.flushLog = callable;
    }

    protected Map<String, String> createContext() {
        Map<String, String> createContext = super.createContext();
        createContext.put("partitionId", Integer.toString(this.partitionId));
        return createContext;
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        this.actor.schedule(RandomDuration.getRandomDurationMinuteBased(MINIMUM_SNAPSHOT_PERIOD, this.snapshotRate), this::scheduleSnapshotOnRate);
    }

    public ActorFuture<Void> closeAsync() {
        return this.actor.isClosed() ? CompletableActorFuture.completed((Object) null) : super.closeAsync();
    }

    protected void handleFailure(Throwable th) {
        LOG.error("No snapshot was taken due to failure in '{}'. Will try to take snapshot after snapshot period {}. {}", new Object[]{this.actorName, this.snapshotRate, th});
        resetStateOnFailure(th);
        this.healthReport = HealthReport.unhealthy(this).withIssue(th);
        Iterator<FailureListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onFailure(this.healthReport);
        }
    }

    public static AsyncSnapshotDirector ofReplayMode(int i, int i2, StreamProcessor streamProcessor, StateController stateController, Duration duration, Callable<CompletableFuture<Void>> callable) {
        return new AsyncSnapshotDirector(i2, streamProcessor, stateController, duration, StreamProcessorMode.REPLAY, callable);
    }

    public static AsyncSnapshotDirector ofProcessingMode(int i, int i2, StreamProcessor streamProcessor, StateController stateController, Duration duration, Callable<CompletableFuture<Void>> callable) {
        return new AsyncSnapshotDirector(i2, streamProcessor, stateController, duration, StreamProcessorMode.PROCESSING, callable);
    }

    private void scheduleSnapshotOnRate() {
        this.actor.runAtFixedRate(this.snapshotRate, this::trySnapshot);
        trySnapshot();
    }

    public CompletableActorFuture<PersistedSnapshot> forceSnapshot() {
        CompletableActorFuture<PersistedSnapshot> completableActorFuture = new CompletableActorFuture<>();
        this.actor.call(() -> {
            trySnapshot().onComplete(completableActorFuture);
        });
        return completableActorFuture;
    }

    public HealthReport getHealthReport() {
        return this.healthReport;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.add(failureListener);
        });
    }

    public void removeFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.listeners.remove(failureListener);
        });
    }

    private ActorFuture<PersistedSnapshot> trySnapshot() {
        if (this.ongoingSnapshotFuture != null) {
            LOG.debug("Already taking snapshot, skipping this request for a new snapshot");
            return CompletableActorFuture.completed((Object) null);
        }
        CompletableActorFuture<PersistedSnapshot> completableActorFuture = new CompletableActorFuture<>();
        this.ongoingSnapshotFuture = completableActorFuture;
        InProgressSnapshot inProgressSnapshot = new InProgressSnapshot();
        this.streamProcessor.getLastProcessedPositionAsync().onComplete((l, th) -> {
            if (th != null) {
                LOG.error(ERROR_MSG_ON_RESOLVE_PROCESSED_POS, th);
                completableActorFuture.completeExceptionally(th);
            } else if (l.longValue() == -1) {
                LOG.debug("We will skip taking this snapshot, because we haven't processed anything yet.");
                completableActorFuture.complete((Object) null);
            } else {
                inProgressSnapshot.lowerBoundSnapshotPosition = l.longValue();
                snapshot(inProgressSnapshot).onComplete(completableActorFuture);
            }
        });
        completableActorFuture.onComplete((persistedSnapshot, th2) -> {
            if (th2 != null && inProgressSnapshot.pendingSnapshot != null) {
                inProgressSnapshot.pendingSnapshot.abort();
            }
            this.ongoingSnapshotFuture = null;
        });
        return completableActorFuture;
    }

    private ActorFuture<PersistedSnapshot> snapshot(InProgressSnapshot inProgressSnapshot) {
        ActorFuture createFuture = this.actor.createFuture();
        ActorFuture createFuture2 = this.actor.createFuture();
        ActorFuture createFuture3 = this.actor.createFuture();
        ActorFuture createFuture4 = this.actor.createFuture();
        ActorFuture<PersistedSnapshot> createFuture5 = this.actor.createFuture();
        takeTransientSnapshot(inProgressSnapshot).onComplete(createFuture);
        Objects.requireNonNull(createFuture2);
        createFuture.onComplete(proceed(createFuture2::completeExceptionally, () -> {
            getLastWrittenPosition(inProgressSnapshot).onComplete(createFuture2);
        }));
        Objects.requireNonNull(createFuture3);
        createFuture2.onComplete(proceed(createFuture3::completeExceptionally, () -> {
            waitUntilLastWrittenPositionIsCommitted(inProgressSnapshot).onComplete(createFuture3);
        }));
        Objects.requireNonNull(createFuture4);
        createFuture3.onComplete(proceed(createFuture4::completeExceptionally, () -> {
            flushJournal().onComplete(createFuture4);
        }));
        Objects.requireNonNull(createFuture5);
        createFuture4.onComplete(proceed(createFuture5::completeExceptionally, () -> {
            persistSnapshot(inProgressSnapshot).onComplete(createFuture5);
        }));
        return createFuture5;
    }

    private ActorFuture<Void> flushJournal() {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        try {
            this.flushLog.call().whenComplete((r5, th) -> {
                if (th == null) {
                    completableActorFuture.complete((Object) null);
                } else {
                    LOG.warn("Failed to flush journal before committing snapshot", th);
                    completableActorFuture.completeExceptionally(th);
                }
            });
        } catch (Exception e) {
            LOG.warn("Failed to flush journal before committing snapshot", e);
            completableActorFuture.completeExceptionally(e);
        }
        return completableActorFuture;
    }

    private ActorFuture<PersistedSnapshot> persistSnapshot(InProgressSnapshot inProgressSnapshot) {
        ActorFuture<PersistedSnapshot> persist = inProgressSnapshot.pendingSnapshot.withLastFollowupEventPosition(inProgressSnapshot.lastWrittenPosition).persist();
        persist.onComplete((persistedSnapshot, th) -> {
            if (th != null) {
                if (th instanceof SnapshotException.SnapshotNotFoundException) {
                    LOG.warn("Failed to persist transient snapshot {}. Nothing to worry if a newer snapshot exists.", inProgressSnapshot.pendingSnapshot, th);
                } else {
                    LOG.error(ERROR_MSG_MOVE_SNAPSHOT, th);
                }
            }
        });
        return persist;
    }

    private ActorFuture<Void> getLastWrittenPosition(InProgressSnapshot inProgressSnapshot) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.streamProcessor.getLastWrittenPositionAsync().onComplete((l, th) -> {
            if (th != null) {
                LOG.error(ERROR_MSG_ON_RESOLVE_WRITTEN_POS, th);
                completableActorFuture.completeExceptionally(th);
            } else {
                inProgressSnapshot.lastWrittenPosition = l.longValue();
                completableActorFuture.complete((Object) null);
            }
        });
        return completableActorFuture;
    }

    private ActorFuture<Void> waitUntilLastWrittenPositionIsCommitted(InProgressSnapshot inProgressSnapshot) {
        if (this.streamProcessorMode == StreamProcessorMode.REPLAY || this.commitPosition >= inProgressSnapshot.lastWrittenPosition) {
            return CompletableActorFuture.completed((Object) null);
        }
        LOG.info(LOG_MSG_WAIT_UNTIL_COMMITTED, Long.valueOf(inProgressSnapshot.lastWrittenPosition), Long.valueOf(this.commitPosition));
        return (ActorFuture) this.commitAwaiters.computeIfAbsent(Long.valueOf(inProgressSnapshot.lastWrittenPosition), l -> {
            return new CompletableActorFuture();
        });
    }

    private ActorFuture<Void> takeTransientSnapshot(InProgressSnapshot inProgressSnapshot) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.stateController.takeTransientSnapshot(inProgressSnapshot.lowerBoundSnapshotPosition).onComplete((transientSnapshot, th) -> {
            if (th != null) {
                logSnapshotTakenError(th);
                completableActorFuture.completeExceptionally(th);
            } else {
                inProgressSnapshot.pendingSnapshot = transientSnapshot;
                completableActorFuture.complete((Object) null);
                onRecovered();
            }
        });
        return completableActorFuture;
    }

    void logSnapshotTakenError(Throwable th) {
        if (th instanceof SnapshotException.SnapshotAlreadyExistsException) {
            LOG.debug("Did not take a snapshot. {}", th.getMessage());
        } else if ((th instanceof NoEntryAtSnapshotPosition) && this.streamProcessorMode == StreamProcessorMode.REPLAY) {
            LOG.debug("Did not take a snapshot: {}. Most likely this partition has not received the entry yet. Will retry in {}", th.getMessage(), this.snapshotRate);
        } else {
            LOG.error("Failed to take a snapshot for {}", this.processorName, th);
        }
    }

    private void onRecovered() {
        if (this.healthReport.isHealthy()) {
            return;
        }
        this.healthReport = HealthReport.healthy(this);
        this.listeners.forEach((v0) -> {
            v0.onRecovered();
        });
    }

    public void onCommit(IndexedRaftLogEntry indexedRaftLogEntry) {
        if (indexedRaftLogEntry.isApplicationEntry()) {
            newPositionCommitted(indexedRaftLogEntry.getApplicationEntry().highestPosition());
        }
    }

    public void newPositionCommitted(long j) {
        this.actor.run(() -> {
            this.commitPosition = j;
            NavigableMap<Long, ActorFuture<Void>> headMap = this.commitAwaiters.headMap(Long.valueOf(this.commitPosition), true);
            headMap.forEach((l, actorFuture) -> {
                actorFuture.complete((Object) null);
            });
            headMap.clear();
        });
    }

    private void resetStateOnFailure(Throwable th) {
        if (this.ongoingSnapshotFuture != null && !this.ongoingSnapshotFuture.isDone()) {
            this.ongoingSnapshotFuture.completeExceptionally(th);
        }
        this.ongoingSnapshotFuture = null;
    }

    private BiConsumer<Void, Throwable> proceed(Consumer<Throwable> consumer, Runnable runnable) {
        return (r5, th) -> {
            if (th != null) {
                consumer.accept(th);
            } else {
                runnable.run();
            }
        };
    }
}
