package org.neo4j.coreedge.raft.replication.shipping;

import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.neo4j.coreedge.raft.LeaderContext;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.membership.RaftMembership;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/raft/replication/shipping/RaftLogShippingManager.class */
public class RaftLogShippingManager extends LifecycleAdapter implements RaftMembership.Listener {
    private final Outbound<CoreMember, RaftMessages.RaftMessage> outbound;
    private final LogProvider logProvider;
    private final ReadableRaftLog raftLog;
    private final Clock clock;
    private final CoreMember myself;
    private final RaftMembership membership;
    private final long retryTimeMillis;
    private final int catchupBatchSize;
    private final int maxAllowedShippingLag;
    private final InFlightMap<Long, RaftLogEntry> inFlightMap;
    private LeaderContext lastLeaderContext;
    private boolean running;
    private Map<CoreMember, RaftLogShipper> logShippers = new HashMap();
    private boolean stopped = false;

    public RaftLogShippingManager(Outbound<CoreMember, RaftMessages.RaftMessage> outbound, LogProvider logProvider, ReadableRaftLog readableRaftLog, Clock clock, CoreMember coreMember, RaftMembership raftMembership, long j, int i, int i2, InFlightMap<Long, RaftLogEntry> inFlightMap) {
        this.outbound = outbound;
        this.logProvider = logProvider;
        this.raftLog = readableRaftLog;
        this.clock = clock;
        this.myself = coreMember;
        this.membership = raftMembership;
        this.retryTimeMillis = j;
        this.catchupBatchSize = i;
        this.maxAllowedShippingLag = i2;
        this.inFlightMap = inFlightMap;
        raftMembership.registerListener(this);
    }

    public synchronized void pause() {
        this.running = false;
        this.logShippers.values().forEach((v0) -> {
            v0.stop();
        });
        this.logShippers.clear();
    }

    public synchronized void resume(LeaderContext leaderContext) {
        if (this.stopped) {
            return;
        }
        this.running = true;
        Iterator<CoreMember> it = this.membership.replicationMembers().iterator();
        while (it.hasNext()) {
            ensureLogShipperRunning(it.next(), leaderContext);
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void stop() {
        pause();
        this.stopped = true;
    }

    private RaftLogShipper ensureLogShipperRunning(CoreMember coreMember, LeaderContext leaderContext) {
        RaftLogShipper raftLogShipper = this.logShippers.get(coreMember);
        if (raftLogShipper == null && !coreMember.equals(this.myself)) {
            raftLogShipper = new RaftLogShipper(this.outbound, this.logProvider, this.raftLog, this.clock, this.myself, coreMember, leaderContext.term, leaderContext.commitIndex, this.retryTimeMillis, this.catchupBatchSize, this.maxAllowedShippingLag, this.inFlightMap);
            this.logShippers.put(coreMember, raftLogShipper);
            raftLogShipper.start();
        }
        return raftLogShipper;
    }

    public synchronized void handleCommands(Iterable<ShipCommand> iterable, LeaderContext leaderContext) throws IOException {
        for (ShipCommand shipCommand : iterable) {
            Iterator<RaftLogShipper> it = this.logShippers.values().iterator();
            while (it.hasNext()) {
                shipCommand.applyTo(it.next(), leaderContext);
            }
        }
        this.lastLeaderContext = leaderContext;
    }

    @Override // org.neo4j.coreedge.raft.membership.RaftMembership.Listener
    public synchronized void onMembershipChanged() {
        if (this.lastLeaderContext == null || !this.running) {
            return;
        }
        HashSet hashSet = new HashSet(this.logShippers.keySet());
        hashSet.removeAll(this.membership.replicationMembers());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            RaftLogShipper remove = this.logShippers.remove((CoreMember) it.next());
            if (remove != null) {
                remove.stop();
            }
        }
        Iterator<CoreMember> it2 = this.membership.replicationMembers().iterator();
        while (it2.hasNext()) {
            ensureLogShipperRunning(it2.next(), this.lastLeaderContext);
        }
    }

    public String toString() {
        return String.format("RaftLogShippingManager{logShippers=%s, myself=%s}", this.logShippers, this.myself);
    }
}
