package org.neo4j.causalclustering.core.consensus.shipping;

import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.neo4j.causalclustering.core.consensus.LeaderContext;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembership;
import org.neo4j.causalclustering.core.consensus.outcome.ShipCommand;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/consensus/shipping/RaftLogShippingManager.class */
public class RaftLogShippingManager extends LifecycleAdapter implements RaftMembership.Listener {
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final LogProvider logProvider;
    private final ReadableRaftLog raftLog;
    private final Clock clock;
    private final MemberId myself;
    private final RaftMembership membership;
    private final long retryTimeMillis;
    private final int catchupBatchSize;
    private final int maxAllowedShippingLag;
    private final InFlightCache inFlightCache;
    private Map<MemberId, RaftLogShipper> logShippers = new HashMap();
    private LeaderContext lastLeaderContext;
    private boolean running;
    private boolean stopped;
    private TimerService timerService;

    public RaftLogShippingManager(Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider, ReadableRaftLog readableRaftLog, TimerService timerService, Clock clock, MemberId memberId, RaftMembership raftMembership, long j, int i, int i2, InFlightCache inFlightCache) {
        this.outbound = outbound;
        this.logProvider = logProvider;
        this.raftLog = readableRaftLog;
        this.timerService = timerService;
        this.clock = clock;
        this.myself = memberId;
        this.membership = raftMembership;
        this.retryTimeMillis = j;
        this.catchupBatchSize = i;
        this.maxAllowedShippingLag = i2;
        this.inFlightCache = inFlightCache;
        raftMembership.registerListener(this);
    }

    public synchronized void pause() {
        this.running = false;
        this.logShippers.values().forEach((v0) -> {
            v0.stop();
        });
        this.logShippers.clear();
    }

    public synchronized void resume(LeaderContext leaderContext) {
        if (this.stopped) {
            return;
        }
        this.running = true;
        Iterator<MemberId> it = this.membership.replicationMembers().iterator();
        while (it.hasNext()) {
            ensureLogShipperRunning(it.next(), leaderContext);
        }
        this.lastLeaderContext = leaderContext;
    }

    public synchronized void stop() {
        pause();
        this.stopped = true;
    }

    private void ensureLogShipperRunning(MemberId memberId, LeaderContext leaderContext) {
        if (this.logShippers.get(memberId) != null || memberId.equals(this.myself)) {
            return;
        }
        RaftLogShipper raftLogShipper = new RaftLogShipper(this.outbound, this.logProvider, this.raftLog, this.clock, this.timerService, this.myself, memberId, leaderContext.term, leaderContext.commitIndex, this.retryTimeMillis, this.catchupBatchSize, this.maxAllowedShippingLag, this.inFlightCache);
        this.logShippers.put(memberId, raftLogShipper);
        raftLogShipper.start();
    }

    public synchronized void handleCommands(Iterable<ShipCommand> iterable, LeaderContext leaderContext) throws IOException {
        for (ShipCommand shipCommand : iterable) {
            Iterator<RaftLogShipper> it = this.logShippers.values().iterator();
            while (it.hasNext()) {
                shipCommand.applyTo(it.next(), leaderContext);
            }
        }
        this.lastLeaderContext = leaderContext;
    }

    @Override // org.neo4j.causalclustering.core.consensus.membership.RaftMembership.Listener
    public synchronized void onMembershipChanged() {
        if (this.lastLeaderContext == null || !this.running) {
            return;
        }
        HashSet hashSet = new HashSet(this.logShippers.keySet());
        hashSet.removeAll(this.membership.replicationMembers());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            RaftLogShipper remove = this.logShippers.remove((MemberId) it.next());
            if (remove != null) {
                remove.stop();
            }
        }
        Iterator<MemberId> it2 = this.membership.replicationMembers().iterator();
        while (it2.hasNext()) {
            ensureLogShipperRunning(it2.next(), this.lastLeaderContext);
        }
    }

    public String toString() {
        return String.format("RaftLogShippingManager{logShippers=%s, myself=%s}", this.logShippers, this.myself);
    }
}
