package org.neo4j.coreedge.raft.replication.shipping;

import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.RenewableTimeoutService;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.helpers.Clock;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper.class */
public class RaftLogShipper<MEMBER> {
    private final Outbound<MEMBER> outbound;
    private final LogProvider logProvider;
    private final Log log;
    private final ReadableRaftLog raftLog;
    private final Clock clock;
    private final MEMBER follower;
    private final MEMBER leader;
    private DelayedRenewableTimeoutService timeoutService;
    private final long retryTimeMillis;
    private final int catchupBatchSize;
    private final int maxAllowedShippingLag;
    private RenewableTimeoutService.RenewableTimeout timeout;
    private long timeoutAbsoluteMillis;
    private long lastSentIndex;
    private LeaderContext lastLeaderContext;
    private final RenewableTimeoutService.TimeoutName timeoutName = () -> {
        return "RESEND";
    };
    private long matchIndex = -1;
    private Mode mode = Mode.MISMATCH;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/coreedge/raft/replication/shipping/RaftLogShipper$Mode.class */
    public enum Mode {
        MISMATCH,
        CATCHUP,
        PIPELINE
    }

    public RaftLogShipper(Outbound<MEMBER> outbound, LogProvider logProvider, ReadableRaftLog readableRaftLog, Clock clock, MEMBER member, MEMBER member2, long j, long j2, long j3, int i, int i2) {
        this.outbound = outbound;
        this.catchupBatchSize = i;
        this.maxAllowedShippingLag = i2;
        this.logProvider = logProvider;
        this.log = logProvider.getLog(getClass());
        this.raftLog = readableRaftLog;
        this.clock = clock;
        this.follower = member2;
        this.leader = member;
        this.retryTimeMillis = j3;
        this.lastLeaderContext = new LeaderContext(j, j2);
    }

    public Object identity() {
        return this.follower;
    }

    public synchronized void start() {
        this.log.info("Starting log shipper to: " + this.follower);
        try {
            this.timeoutService = new DelayedRenewableTimeoutService(this.clock, this.logProvider);
            this.timeoutService.init();
            this.timeoutService.start();
        } catch (Throwable th) {
            this.log.error("Failed to start log shipper to: " + this.follower, th);
        }
        try {
            sendSingle(this.raftLog.appendIndex(), this.lastLeaderContext);
        } catch (RaftStorageException e) {
            this.log.error("Exception during send: " + this.follower, e);
        }
    }

    public synchronized void stop() {
        this.log.info("Stopping log shipper to: " + this.follower);
        try {
            this.timeoutService.stop();
            this.timeoutService.shutdown();
        } catch (Throwable th) {
            this.log.error("Failed to start log shipper to: " + this.follower, th);
        }
        abortTimeout();
    }

    public synchronized void onMismatch(long j, LeaderContext leaderContext) throws RaftStorageException {
        switch (AnonymousClass1.$SwitchMap$org$neo4j$coreedge$raft$replication$shipping$RaftLogShipper$Mode[this.mode.ordinal()]) {
            case DelayedRenewableTimeoutService.TIMER_RESOLUTION /* 1 */:
                sendSingle(Long.max(Long.min(this.lastSentIndex - 1, j), 0L), leaderContext);
                break;
            case 2:
            case 3:
                this.log.info(String.format("Mismatch in mode %s from follower %s", this.mode, this.follower));
                this.mode = Mode.MISMATCH;
                sendSingle(this.lastSentIndex, leaderContext);
                break;
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onMatch(long j, LeaderContext leaderContext) throws RaftStorageException {
        boolean z = j > this.matchIndex;
        this.matchIndex = Long.max(j, this.matchIndex);
        switch (AnonymousClass1.$SwitchMap$org$neo4j$coreedge$raft$replication$shipping$RaftLogShipper$Mode[this.mode.ordinal()]) {
            case DelayedRenewableTimeoutService.TIMER_RESOLUTION /* 1 */:
                if (!sendNextBatchAfterMatch(leaderContext)) {
                    this.log.info(String.format("Starting catch up after mismatch: %s", this.follower));
                    this.mode = Mode.CATCHUP;
                    break;
                } else {
                    this.log.info(String.format("Caught up after mismatch: %s", this.follower));
                    this.mode = Mode.PIPELINE;
                    break;
                }
            case 2:
                if (this.matchIndex != this.lastSentIndex) {
                    if (z) {
                        scheduleTimeout(this.retryTimeMillis);
                        break;
                    }
                } else {
                    abortTimeout();
                    break;
                }
                break;
            case 3:
                if (this.matchIndex >= this.lastSentIndex && sendNextBatchAfterMatch(leaderContext)) {
                    this.log.info(String.format("Caught up: %s", this.follower));
                    this.mode = Mode.PIPELINE;
                    break;
                }
                break;
        }
        this.lastLeaderContext = leaderContext;
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    public synchronized void onNewEntry(long j, long j2, RaftLogEntry raftLogEntry, LeaderContext leaderContext) throws RaftStorageException {
        switch (this.mode) {
            case PIPELINE:
                while (true) {
                    if (this.lastSentIndex <= j) {
                        if (j - this.matchIndex > this.maxAllowedShippingLag) {
                            this.mode = Mode.CATCHUP;
                            break;
                        } else {
                            sendNewEntry(j, j2, raftLogEntry, leaderContext);
                        }
                    } else {
                        break;
                    }
                }
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onCommitUpdate(LeaderContext leaderContext) throws RaftStorageException {
        switch (this.mode) {
            case PIPELINE:
                sendCommitUpdate(leaderContext);
                break;
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void onScheduledTimeoutExpiry() {
        try {
            if (timedOut()) {
                onTimeout();
            } else if (this.timeoutAbsoluteMillis != 0) {
                long currentTimeMillis = this.timeoutAbsoluteMillis - this.clock.currentTimeMillis();
                if (currentTimeMillis > 0) {
                    scheduleTimeout(currentTimeMillis);
                } else {
                    onTimeout();
                }
            }
        } catch (RaftStorageException e) {
            this.log.error("Exception during timeout handling: " + this.follower, e);
        }
    }

    private void onTimeout() throws RaftStorageException {
        switch (AnonymousClass1.$SwitchMap$org$neo4j$coreedge$raft$replication$shipping$RaftLogShipper$Mode[this.mode.ordinal()]) {
            case DelayedRenewableTimeoutService.TIMER_RESOLUTION /* 1 */:
            case 3:
                break;
            case 2:
                this.mode = Mode.CATCHUP;
                break;
            default:
                return;
        }
        if (this.lastLeaderContext != null) {
            sendSingle(this.lastSentIndex, this.lastLeaderContext);
        }
    }

    private boolean timedOut() {
        return this.timeoutAbsoluteMillis != 0 && this.clock.currentTimeMillis() - this.timeoutAbsoluteMillis >= 0;
    }

    private void scheduleTimeout(long j) {
        this.timeoutAbsoluteMillis = this.clock.currentTimeMillis() + j;
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        this.timeout = this.timeoutService.create(this.timeoutName, j, 0L, renewableTimeout -> {
            onScheduledTimeoutExpiry();
        });
    }

    private void abortTimeout() {
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        this.timeoutAbsoluteMillis = 0L;
    }

    private boolean sendNextBatchAfterMatch(LeaderContext leaderContext) throws RaftStorageException {
        long appendIndex = this.raftLog.appendIndex();
        if (appendIndex <= this.matchIndex) {
            return true;
        }
        long min = Long.min(appendIndex, this.matchIndex + this.catchupBatchSize);
        scheduleTimeout(this.retryTimeMillis);
        sendRange(this.matchIndex + 1, min, leaderContext);
        return min == appendIndex;
    }

    private void sendCommitUpdate(LeaderContext leaderContext) throws RaftStorageException {
        this.outbound.send(this.follower, new RaftMessages.Heartbeat(this.leader, leaderContext.term, leaderContext.commitIndex, leaderContext.term));
    }

    private void sendSingle(long j, LeaderContext leaderContext) throws RaftStorageException {
        scheduleTimeout(this.retryTimeMillis);
        this.lastSentIndex = j;
        long j2 = j - 1;
        long readEntryTerm = this.raftLog.readEntryTerm(j2);
        if (readEntryTerm > leaderContext.term) {
            this.log.warn(String.format("Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, Long.valueOf(readEntryTerm)));
        } else {
            this.outbound.send(this.follower, new RaftMessages.AppendEntries.Request(this.leader, leaderContext.term, j2, readEntryTerm, this.raftLog.entryExists(j) ? new RaftLogEntry[]{this.raftLog.readLogEntry(j)} : RaftLogEntry.empty, leaderContext.commitIndex));
        }
    }

    private void sendNewEntry(long j, long j2, RaftLogEntry raftLogEntry, LeaderContext leaderContext) throws RaftStorageException {
        scheduleTimeout(this.retryTimeMillis);
        this.lastSentIndex = j + 1;
        this.outbound.send(this.follower, new RaftMessages.AppendEntries.Request(this.leader, leaderContext.term, j, j2, new RaftLogEntry[]{raftLogEntry}, leaderContext.commitIndex));
    }

    private void sendRange(long j, long j2, LeaderContext leaderContext) throws RaftStorageException {
        if (j > j2) {
            return;
        }
        this.lastSentIndex = j2;
        int i = (int) ((j2 - j) + 1);
        RaftLogEntry[] raftLogEntryArr = new RaftLogEntry[i];
        long j3 = j - 1;
        long readEntryTerm = this.raftLog.readEntryTerm(j3);
        if (readEntryTerm > leaderContext.term) {
            this.log.warn(String.format("Aborting send. Not leader anymore? %s, prevLogTerm=%d", leaderContext, Long.valueOf(readEntryTerm)));
            return;
        }
        RaftMessages.AppendEntries.Request request = new RaftMessages.AppendEntries.Request(this.leader, leaderContext.term, j3, readEntryTerm, raftLogEntryArr, leaderContext.commitIndex);
        for (int i2 = 0; i2 < i; i2++) {
            raftLogEntryArr[i2] = this.raftLog.readLogEntry(j + i2);
            if (raftLogEntryArr[i2].term() > leaderContext.term) {
                this.log.warn(String.format("Aborting send. Not leader anymore? %s, entryTerm=%d", leaderContext, Long.valueOf(raftLogEntryArr[i2].term())));
                return;
            }
        }
        this.outbound.send(this.follower, request);
    }
}
