package io.zeebe.broker.clustering.base.snapshots;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.api.ErrorResponse;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkRequest;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkResponse;
import io.zeebe.broker.clustering.api.ListSnapshotsRequest;
import io.zeebe.broker.clustering.api.ListSnapshotsResponse;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.NodeInfo;
import io.zeebe.broker.clustering.base.topology.ReadableTopology;
import io.zeebe.broker.clustering.base.topology.TopologyManager;
import io.zeebe.clustering.management.MessageHeaderDecoder;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.StreamUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/base/snapshots/SnapshotReplicationService.class */
public class SnapshotReplicationService extends Actor implements Service<SnapshotReplicationService> {
    public static final int DEFAULT_CHUNK_LENGTH = 524288;
    private ClientTransport clientTransport;
    private TopologyManager topologyManager;
    private Partition partition;
    private final Duration pollInterval;
    private int leaderNodeId;
    private String actorName;
    private long lastPollEpoch;
    private SnapshotWriter currentSnapshotWriter;
    private ListSnapshotsResponse.SnapshotMetadata currentReplicatingSnapshot;
    private long chunkOffset;
    private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    public static final Duration ERROR_RETRY_INTERVAL = Duration.ofSeconds(1);
    private final Injector<ClientTransport> managementClientApiInjector = new Injector<>();
    private final Injector<TopologyManager> topologyManagerInjector = new Injector<>();
    private final Injector<Partition> partitionInjector = new Injector<>();
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final ErrorResponse errorResponse = new ErrorResponse();
    private final ListSnapshotsRequest listSnapshotsRequest = new ListSnapshotsRequest();
    private final ListSnapshotsResponse listSnapshotsResponse = new ListSnapshotsResponse();
    private final FetchSnapshotChunkRequest fetchSnapshotChunkRequest = new FetchSnapshotChunkRequest();
    private final FetchSnapshotChunkResponse fetchSnapshotChunkResponse = new FetchSnapshotChunkResponse();
    private final Queue<ListSnapshotsResponse.SnapshotMetadata> snapshotsToReplicate = new ArrayDeque();

    public SnapshotReplicationService(Duration duration) {
        this.pollInterval = duration;
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.clientTransport = (ClientTransport) this.managementClientApiInjector.getValue();
        this.partition = (Partition) this.partitionInjector.getValue();
        this.topologyManager = (TopologyManager) this.topologyManagerInjector.getValue();
        this.listSnapshotsRequest.setPartitionId(this.partition.getInfo().getPartitionId());
        this.actorName = String.format("partition-%d-snap-repl", Integer.valueOf(this.partition.getInfo().getPartitionId()));
        LOG.debug("Starting replication for partition {}", this.partition.getInfo());
        serviceStartContext.async(serviceStartContext.getScheduler().submitActor(this), true);
    }

    public void stop(ServiceStopContext serviceStopContext) {
        LOG.debug("Stopping replication for partition {}", this.partition.getInfo());
        serviceStopContext.async(this.actor.close());
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public SnapshotReplicationService m24get() {
        return this;
    }

    protected void onActorStarted() {
        pollLeaderForSnapshots();
    }

    protected void onActorClosing() {
        this.snapshotsToReplicate.clear();
        abortCurrentReplication();
    }

    public String getName() {
        return this.actorName == null ? super.getName() : this.actorName;
    }

    private void pollLeaderForSnapshots() {
        this.actor.runOnCompletion(this.topologyManager.query(this::getLeaderInfo), (nodeInfo, th) -> {
            if (th != null) {
                LOG.error("Failed to query topology for leader info, retrying", th);
                this.actor.runDelayed(ERROR_RETRY_INTERVAL, this::pollLeaderForSnapshots);
            } else if (nodeInfo == null) {
                LOG.trace("Waiting for leader node info, retrying");
                this.actor.runDelayed(ERROR_RETRY_INTERVAL, this::pollLeaderForSnapshots);
            } else {
                this.leaderNodeId = nodeInfo.getNodeId();
                LOG.trace("Updated leader node as {}", Integer.valueOf(this.leaderNodeId));
                pollSnapshots();
            }
        });
    }

    private void pollSnapshots() {
        this.lastPollEpoch = ActorClock.currentTimeMillis();
        ActorFuture sendRequest = this.clientTransport.getOutput().sendRequest(Integer.valueOf(this.leaderNodeId), this.listSnapshotsRequest);
        LOG.trace("Polling snapshots from {}", Integer.valueOf(this.leaderNodeId));
        this.snapshotsToReplicate.clear();
        this.actor.runOnCompletion(sendRequest, (clientResponse, th) -> {
            if (th == null) {
                handleListSnapshotsResponse(clientResponse.getResponseBuffer());
            } else {
                LOG.error("Error listing snapshots from leader", th);
                this.actor.runDelayed(ERROR_RETRY_INTERVAL, this::pollLeaderForSnapshots);
            }
        });
    }

    private void schedulePollSnapshots() {
        Duration minusMillis = this.pollInterval.minusMillis(ActorClock.currentTimeMillis() - this.lastPollEpoch);
        if (minusMillis.isNegative() || minusMillis.isZero()) {
            this.actor.run(this::pollSnapshots);
        } else {
            this.actor.runDelayed(minusMillis, this::pollSnapshots);
        }
    }

    private void handleListSnapshotsResponse(DirectBuffer directBuffer) {
        if (isErrorResponse(directBuffer)) {
            logErrorResponse("Error listing snapshots", directBuffer);
            this.actor.runDelayed(ERROR_RETRY_INTERVAL, this::pollLeaderForSnapshots);
            return;
        }
        this.listSnapshotsResponse.wrap(directBuffer);
        for (ListSnapshotsResponse.SnapshotMetadata snapshotMetadata : this.listSnapshotsResponse.getSnapshots()) {
            if (!this.partition.getSnapshotStorage().snapshotExists(snapshotMetadata.getName(), snapshotMetadata.getLogPosition())) {
                this.snapshotsToReplicate.add(snapshotMetadata);
            }
        }
        LOG.trace("Replicating {} snapshots", Integer.valueOf(this.snapshotsToReplicate.size()));
        replicateNextSnapshot();
    }

    private void replicateNextSnapshot() {
        this.chunkOffset = 0L;
        this.currentReplicatingSnapshot = this.snapshotsToReplicate.poll();
        if (this.currentReplicatingSnapshot == null) {
            schedulePollSnapshots();
            return;
        }
        try {
            this.currentSnapshotWriter = this.partition.getSnapshotStorage().createTemporarySnapshot(this.currentReplicatingSnapshot.getName(), this.currentReplicatingSnapshot.getLogPosition());
            replicateSnapshot();
        } catch (Exception e) {
            LOG.error("Could not create writer for {}", this.currentReplicatingSnapshot, e);
            replicateNextSnapshot();
        }
    }

    private void replicateSnapshot() {
        this.actor.runOnCompletion(this.clientTransport.getOutput().sendRequest(Integer.valueOf(this.leaderNodeId), requestForNextChunk()), (clientResponse, th) -> {
            if (th == null) {
                handleFetchSnapshotChunkResponse(clientResponse.getResponseBuffer());
            } else {
                LOG.error("Error fetching chunk", th);
                abortAndReplicateNext();
            }
        });
    }

    private void handleFetchSnapshotChunkResponse(DirectBuffer directBuffer) {
        if (isErrorResponse(directBuffer)) {
            logErrorResponse("Error fetching chunk", directBuffer);
            abortAndReplicateNext();
            return;
        }
        this.fetchSnapshotChunkResponse.wrap(directBuffer);
        try {
            StreamUtil.write(this.fetchSnapshotChunkResponse.getData(), this.currentSnapshotWriter.getOutputStream());
            this.chunkOffset += r0.capacity();
            if (this.chunkOffset >= this.currentReplicatingSnapshot.getLength()) {
                finalizeSnapshot();
            } else {
                replicateSnapshot();
            }
        } catch (Exception e) {
            LOG.error("Error writing chunk", e);
            abortAndReplicateNext();
        }
    }

    private void finalizeSnapshot() {
        try {
            this.currentSnapshotWriter.validateAndCommit(this.currentReplicatingSnapshot.getChecksum());
            this.currentSnapshotWriter = null;
            replicateNextSnapshot();
        } catch (Exception e) {
            LOG.error("Error committing, aborting", e);
            abortAndReplicateNext();
        }
    }

    private void abortCurrentReplication() {
        this.chunkOffset = 0L;
        this.currentReplicatingSnapshot = null;
        if (this.currentSnapshotWriter != null) {
            this.currentSnapshotWriter.abort();
        }
    }

    private void abortAndReplicateNext() {
        abortCurrentReplication();
        replicateNextSnapshot();
    }

    private FetchSnapshotChunkRequest requestForNextChunk() {
        return this.fetchSnapshotChunkRequest.setPartitionId(this.partition.getInfo().getPartitionId()).setName(this.currentReplicatingSnapshot.getName()).setLogPosition(this.currentReplicatingSnapshot.getLogPosition()).setChunkLength(DEFAULT_CHUNK_LENGTH).setChunkOffset(this.chunkOffset);
    }

    private void logErrorResponse(String str, DirectBuffer directBuffer) {
        this.errorResponse.wrap(directBuffer);
        LOG.error("{} - {} - {}", new Object[]{str, this.errorResponse.getCode(), this.errorResponse.getMessage()});
    }

    private boolean isErrorResponse(DirectBuffer directBuffer) {
        this.messageHeaderDecoder.m209wrap(directBuffer, 0);
        return this.messageHeaderDecoder.templateId() == 10;
    }

    private NodeInfo getLeaderInfo(ReadableTopology readableTopology) {
        return readableTopology.getLeader(this.partition.getInfo().getPartitionId());
    }

    public Injector<ClientTransport> getManagementClientApiInjector() {
        return this.managementClientApiInjector;
    }

    public Injector<Partition> getPartitionInjector() {
        return this.partitionInjector;
    }

    public Injector<TopologyManager> getTopologyManagerInjector() {
        return this.topologyManagerInjector;
    }
}
