package org.neo4j.causalclustering.catchup.tx;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFailedException;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.storecopy.StreamingTransactionsFailedException;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseSelectionException;
import org.neo4j.causalclustering.readreplica.UpstreamDatabaseStrategySelector;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess.class */
public class CatchupPollingProcess extends LifecycleAdapter {
    private final LocalDatabase localDatabase;
    private final Log log;
    private final Lifecycle startStopOnStoreCopy;
    private final StoreCopyProcess storeCopyProcess;
    private final Supplier<DatabaseHealth> databaseHealthSupplier;
    private final CatchUpClient catchUpClient;
    private final UpstreamDatabaseStrategySelector selectionStrategyPipeline;
    private final RenewableTimeoutService timeoutService;
    private final long txPullIntervalMillis;
    private final BatchingTxApplier applier;
    private final PullRequestMonitor pullRequestMonitor;
    private RenewableTimeoutService.RenewableTimeout timeout;
    private State state = State.TX_PULLING;
    private DatabaseHealth dbHealth;
    private CompletableFuture<Boolean> upToDateFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess$State.class */
    public enum State {
        TX_PULLING,
        STORE_COPYING,
        PANIC
    }

    /* loaded from: input_file:org/neo4j/causalclustering/catchup/tx/CatchupPollingProcess$Timeouts.class */
    enum Timeouts implements RenewableTimeoutService.TimeoutName {
        TX_PULLER_TIMEOUT
    }

    public CatchupPollingProcess(LogProvider logProvider, LocalDatabase localDatabase, Lifecycle lifecycle, CatchUpClient catchUpClient, UpstreamDatabaseStrategySelector upstreamDatabaseStrategySelector, RenewableTimeoutService renewableTimeoutService, long j, BatchingTxApplier batchingTxApplier, Monitors monitors, StoreCopyProcess storeCopyProcess, Supplier<DatabaseHealth> supplier) {
        this.localDatabase = localDatabase;
        this.log = logProvider.getLog(getClass());
        this.startStopOnStoreCopy = lifecycle;
        this.catchUpClient = catchUpClient;
        this.selectionStrategyPipeline = upstreamDatabaseStrategySelector;
        this.timeoutService = renewableTimeoutService;
        this.txPullIntervalMillis = j;
        this.applier = batchingTxApplier;
        this.pullRequestMonitor = (PullRequestMonitor) monitors.newMonitor(PullRequestMonitor.class, new String[0]);
        this.storeCopyProcess = storeCopyProcess;
        this.databaseHealthSupplier = supplier;
    }

    public synchronized void start() throws Throwable {
        this.timeout = this.timeoutService.create(Timeouts.TX_PULLER_TIMEOUT, this.txPullIntervalMillis, 0L, renewableTimeout -> {
            onTimeout();
        });
        this.dbHealth = this.databaseHealthSupplier.get();
        this.upToDateFuture = new CompletableFuture<>();
    }

    public Future<Boolean> upToDateFuture() throws InterruptedException {
        return this.upToDateFuture;
    }

    public void stop() throws Throwable {
        this.timeout.cancel();
    }

    public State state() {
        return this.state;
    }

    private void onTimeout() {
        try {
            switch (this.state) {
                case TX_PULLING:
                    pullTransactions();
                    break;
                case STORE_COPYING:
                    copyStore();
                    break;
                default:
                    throw new IllegalStateException("Tried to execute catchup but was in state " + this.state);
            }
        } catch (Throwable th) {
            panic(th);
        }
        if (this.state != State.PANIC) {
            this.timeout.renew();
        }
    }

    private synchronized void panic(Throwable th) {
        this.log.error("Unexpected issue in catchup process. No more catchup requests will be scheduled.", th);
        this.dbHealth.panic(th);
        this.upToDateFuture.completeExceptionally(th);
        this.state = State.PANIC;
    }

    private void pullTransactions() {
        try {
            MemberId bestUpstreamDatabase = this.selectionStrategyPipeline.bestUpstreamDatabase();
            StoreId storeId = this.localDatabase.storeId();
            boolean z = true;
            int i = 1;
            while (z) {
                z = pullAndApplyBatchOfTransactions(bestUpstreamDatabase, storeId, i);
                i++;
            }
        } catch (UpstreamDatabaseSelectionException e) {
            this.log.warn("Could not find upstream database from which to pull.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleTransaction(CommittedTransactionRepresentation committedTransactionRepresentation) {
        if (this.state == State.PANIC) {
            return;
        }
        try {
            this.applier.queue(committedTransactionRepresentation);
        } catch (Throwable th) {
            panic(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void streamComplete() {
        if (this.state == State.PANIC) {
            return;
        }
        try {
            this.applier.applyBatch();
        } catch (Throwable th) {
            panic(th);
        }
    }

    private boolean pullAndApplyBatchOfTransactions(MemberId memberId, StoreId storeId, int i) {
        long lastQueuedTxId = this.applier.lastQueuedTxId();
        this.pullRequestMonitor.txPullRequest(lastQueuedTxId);
        new TxPullRequest(lastQueuedTxId, storeId);
        this.log.debug("Pull transactions where tx id > %d [batch #%d]", new Object[]{Long.valueOf(lastQueuedTxId), Integer.valueOf(i)});
        try {
            switch ((CatchupResult) this.catchUpClient.makeBlockingRequest(memberId, r0, new CatchUpResponseAdaptor<CatchupResult>() { // from class: org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess.1
                @Override // org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor, org.neo4j.causalclustering.catchup.CatchUpResponseCallback
                public void onTxPullResponse(CompletableFuture<CatchupResult> completableFuture, TxPullResponse txPullResponse) {
                    CatchupPollingProcess.this.handleTransaction(txPullResponse.tx());
                }

                @Override // org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor, org.neo4j.causalclustering.catchup.CatchUpResponseCallback
                public void onTxStreamFinishedResponse(CompletableFuture<CatchupResult> completableFuture, TxStreamFinishedResponse txStreamFinishedResponse) {
                    CatchupPollingProcess.this.streamComplete();
                    completableFuture.complete(txStreamFinishedResponse.status());
                }
            })) {
                case SUCCESS_END_OF_BATCH:
                    return true;
                case SUCCESS_END_OF_STREAM:
                    this.log.debug("Successfully pulled transactions from %d", new Object[]{Long.valueOf(lastQueuedTxId)});
                    this.upToDateFuture.complete(true);
                    return false;
                case E_TRANSACTION_PRUNED:
                    this.log.info("Tx pull unable to get transactions starting from %d since transactions have been pruned. Attempting a store copy.", new Object[]{Long.valueOf(lastQueuedTxId)});
                    this.state = State.STORE_COPYING;
                    return false;
                default:
                    this.log.info("Tx pull request unable to get transactions > %d " + lastQueuedTxId);
                    return false;
            }
        } catch (CatchUpClientException e) {
            streamComplete();
            return false;
        }
    }

    private void copyStore() {
        try {
            downloadDatabase(this.selectionStrategyPipeline.bestUpstreamDatabase(), this.localDatabase.storeId());
        } catch (UpstreamDatabaseSelectionException e) {
            this.log.warn("Could not find upstream database from which to copy store", e);
        }
    }

    private void downloadDatabase(MemberId memberId, StoreId storeId) {
        RuntimeException runtimeException;
        try {
            this.localDatabase.stop();
            this.startStopOnStoreCopy.stop();
            try {
                this.storeCopyProcess.replaceWithStoreFrom(memberId, storeId);
                try {
                    this.localDatabase.start();
                    this.startStopOnStoreCopy.start();
                    this.state = State.TX_PULLING;
                    this.applier.refreshFromNewStore();
                } finally {
                }
            } catch (IOException | StoreCopyFailedException | StreamingTransactionsFailedException e) {
                this.log.warn(String.format("Error copying store from: %s. Will retry shortly.", memberId));
            }
        } finally {
        }
    }
}
