package org.neo4j.coreedge.catchup.storecopy;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.neo4j.coreedge.catchup.RequestMessageType;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotListener;
import org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.GetStoreRequest;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileReceiver;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileStreamingCompleteListener;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreFileStreams;
import org.neo4j.coreedge.catchup.storecopy.edge.StoreIdReceiver;
import org.neo4j.coreedge.catchup.tx.edge.PullRequestMonitor;
import org.neo4j.coreedge.catchup.tx.edge.TxPullRequest;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponse;
import org.neo4j.coreedge.catchup.tx.edge.TxPullResponseListener;
import org.neo4j.coreedge.catchup.tx.edge.TxStreamCompleteListener;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.net.CoreOutbound;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.state.CoreSnapshot;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.NonBlockingChannels;
import org.neo4j.coreedge.server.SenderService;
import org.neo4j.coreedge.server.StoreId;
import org.neo4j.helpers.Listeners;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/catchup/storecopy/CoreClient.class */
public abstract class CoreClient extends LifecycleAdapter implements StoreFileReceiver, StoreIdReceiver, StoreFileStreamingCompleteListener, TxStreamCompleteListener, TxPullResponseListener, CoreSnapshotListener {
    private final PullRequestMonitor pullRequestMonitor;
    private final SenderService senderService;
    private StoreFileStreams storeFileStreams;
    private Consumer<StoreId> storeIdConsumer;
    private final Listeners<StoreFileStreamingCompleteListener> storeFileStreamingCompleteListeners = new Listeners<>();
    private final Listeners<TxStreamCompleteListener> txStreamCompleteListeners = new Listeners<>();
    private final Listeners<TxPullResponseListener> txPullResponseListeners = new Listeners<>();
    private CompletableFuture<CoreSnapshot> coreSnapshotFuture;
    private Outbound<CoreMember, Message> outbound;

    public CoreClient(LogProvider logProvider, ChannelInitializer<SocketChannel> channelInitializer, Monitors monitors, int i, NonBlockingChannels nonBlockingChannels, TopologyService topologyService, long j) {
        this.senderService = new SenderService(channelInitializer, logProvider, monitors, i, nonBlockingChannels);
        this.outbound = new CoreOutbound(topologyService, this.senderService, logProvider, j);
        this.pullRequestMonitor = (PullRequestMonitor) monitors.newMonitor(PullRequestMonitor.class, new String[0]);
    }

    public void requestStore(CoreMember coreMember) {
        send(coreMember, RequestMessageType.STORE, new GetStoreRequest());
    }

    public void requestStoreId(CoreMember coreMember) {
        send(coreMember, RequestMessageType.STORE_ID, new GetStoreIdRequest());
    }

    public CompletableFuture<CoreSnapshot> requestCoreSnapshot(CoreMember coreMember) {
        this.coreSnapshotFuture = new CompletableFuture<>();
        send(coreMember, RequestMessageType.RAFT_STATE, new CoreSnapshotRequest());
        return this.coreSnapshotFuture;
    }

    public void pollForTransactions(CoreMember coreMember, long j) {
        send(coreMember, RequestMessageType.TX_PULL_REQUEST, new TxPullRequest(j));
        this.pullRequestMonitor.txPullRequest(j);
    }

    private void send(CoreMember coreMember, RequestMessageType requestMessageType, Message message) {
        this.outbound.send((Outbound<CoreMember, Message>) coreMember, (Collection<Message>) Arrays.asList(requestMessageType, message));
    }

    public void start() throws Throwable {
        this.senderService.start();
    }

    public void stop() throws Throwable {
        this.senderService.stop();
    }

    public void addTxPullResponseListener(TxPullResponseListener txPullResponseListener) {
        this.txPullResponseListeners.add(txPullResponseListener);
    }

    public void removeTxPullResponseListener(TxPullResponseListener txPullResponseListener) {
        this.txPullResponseListeners.remove(txPullResponseListener);
    }

    public void addStoreFileStreamingCompleteListener(StoreFileStreamingCompleteListener storeFileStreamingCompleteListener) {
        this.storeFileStreamingCompleteListeners.add(storeFileStreamingCompleteListener);
    }

    public void removeStoreFileStreamingCompleteListener(StoreFileStreamingCompleteListener storeFileStreamingCompleteListener) {
        this.storeFileStreamingCompleteListeners.remove(storeFileStreamingCompleteListener);
    }

    @Override // org.neo4j.coreedge.catchup.storecopy.edge.StoreFileReceiver
    public StoreFileStreams getStoreFileStreams() {
        return this.storeFileStreams;
    }

    public void setStoreFileStreams(StoreFileStreams storeFileStreams) {
        this.storeFileStreams = storeFileStreams;
    }

    public void setStoreIdConsumer(Consumer<StoreId> consumer) {
        this.storeIdConsumer = consumer;
    }

    @Override // org.neo4j.coreedge.catchup.storecopy.edge.StoreFileStreamingCompleteListener
    public void onFileStreamingComplete(long j) {
        this.storeFileStreamingCompleteListeners.notify(storeFileStreamingCompleteListener -> {
            storeFileStreamingCompleteListener.onFileStreamingComplete(j);
        });
    }

    @Override // org.neo4j.coreedge.catchup.tx.edge.TxStreamCompleteListener
    public void onTxStreamingComplete(long j) {
        this.txStreamCompleteListeners.notify(txStreamCompleteListener -> {
            txStreamCompleteListener.onTxStreamingComplete(j);
        });
    }

    @Override // org.neo4j.coreedge.catchup.tx.edge.TxPullResponseListener
    public void onTxReceived(TxPullResponse txPullResponse) {
        this.txPullResponseListeners.notify(txPullResponseListener -> {
            txPullResponseListener.onTxReceived(txPullResponse);
        });
    }

    @Override // org.neo4j.coreedge.catchup.storecopy.edge.StoreIdReceiver
    public void onStoreIdReceived(StoreId storeId) {
        this.storeIdConsumer.accept(storeId);
    }

    @Override // org.neo4j.coreedge.catchup.storecopy.core.CoreSnapshotListener
    public void onSnapshotReceived(CoreSnapshot coreSnapshot) {
        this.coreSnapshotFuture.complete(coreSnapshot);
    }

    public void addTxStreamCompleteListener(TxStreamCompleteListener txStreamCompleteListener) {
        this.txStreamCompleteListeners.add(txStreamCompleteListener);
    }

    public void removeTxStreamCompleteListener(TxStreamCompleteListener txStreamCompleteListener) {
        this.txStreamCompleteListeners.remove(txStreamCompleteListener);
    }
}
