package org.neo4j.causalclustering.core.replication;

import java.time.Clock;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/core/replication/RaftReplicator.class */
public class RaftReplicator extends LifecycleAdapter implements Replicator, Listener<MemberId> {
    private final MemberId me;
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final ProgressTracker progressTracker;
    private final LocalSessionPool sessionPool;
    private final TimeoutStrategy progressTimeoutStrategy;
    private final AvailabilityGuard availabilityGuard;
    private final LeaderLocator leaderLocator;
    private final TimeoutStrategy leaderTimeoutStrategy;
    private final Log log;
    private final Throttler throttler;
    private final Clock clock;

    public RaftReplicator(LeaderLocator leaderLocator, MemberId memberId, Outbound<MemberId, RaftMessages.RaftMessage> outbound, LocalSessionPool localSessionPool, ProgressTracker progressTracker, TimeoutStrategy timeoutStrategy, TimeoutStrategy timeoutStrategy2, AvailabilityGuard availabilityGuard, LogProvider logProvider, long j, Clock clock) {
        this.me = memberId;
        this.outbound = outbound;
        this.progressTracker = progressTracker;
        this.sessionPool = localSessionPool;
        this.progressTimeoutStrategy = timeoutStrategy;
        this.leaderTimeoutStrategy = timeoutStrategy2;
        this.availabilityGuard = availabilityGuard;
        this.throttler = new Throttler(j);
        this.leaderLocator = leaderLocator;
        this.clock = clock;
        leaderLocator.registerListener(this);
        this.log = logProvider.getLog(getClass());
    }

    @Override // org.neo4j.causalclustering.core.replication.Replicator
    public Future<Object> replicate(ReplicatedContent replicatedContent, boolean z) throws InterruptedException {
        return replicatedContent.hasSize() ? (Future) this.throttler.invoke(() -> {
            return replicate0(replicatedContent, z);
        }, replicatedContent.size()) : replicate0(replicatedContent, z);
    }

    private Future<Object> replicate0(ReplicatedContent replicatedContent, boolean z) throws InterruptedException {
        OperationContext acquireSession = this.sessionPool.acquireSession();
        DistributedOperation distributedOperation = new DistributedOperation(replicatedContent, acquireSession.globalSession(), acquireSession.localOperationId());
        Progress start = this.progressTracker.start(distributedOperation);
        TimeoutStrategy.Timeout newTimeout = this.progressTimeoutStrategy.newTimeout();
        TimeoutStrategy.Timeout newTimeout2 = this.leaderTimeoutStrategy.newTimeout();
        do {
            assertDatabaseNotShutdown();
            try {
                this.outbound.send(this.leaderLocator.getLeader(), new RaftMessages.NewEntry.Request(this.me, distributedOperation), true);
                newTimeout2 = this.leaderTimeoutStrategy.newTimeout();
                start.awaitReplication(newTimeout.getMillis());
                newTimeout.increment();
            } catch (InterruptedException e) {
                this.progressTracker.abort(distributedOperation);
                throw e;
            } catch (NoLeaderFoundException e2) {
                this.log.debug("Could not replicate operation " + distributedOperation + " because no leader was found. Retrying.", e2);
                Thread.sleep(newTimeout2.getMillis());
                newTimeout2.increment();
            }
        } while (!start.isReplicated());
        BiConsumer<? super Object, ? super Throwable> biConsumer = (obj, th) -> {
            this.sessionPool.releaseSession(acquireSession);
        };
        if (z) {
            start.futureResult().whenComplete(biConsumer);
        } else {
            biConsumer.accept(null, null);
        }
        return start.futureResult();
    }

    public void receive(MemberId memberId) {
        this.progressTracker.triggerReplicationEvent();
    }

    private void assertDatabaseNotShutdown() throws InterruptedException {
        if (this.availabilityGuard.isShutdown()) {
            throw new DatabaseShutdownException("Database has been shutdown, transaction cannot be replicated.");
        }
    }
}
