package org.neo4j.coreedge.catchup;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.neo4j.coreedge.catchup.CatchupServerProtocol;
import org.neo4j.coreedge.catchup.RequestMessageType;
import org.neo4j.coreedge.catchup.ResponseMessageType;
import org.neo4j.coreedge.catchup.storecopy.FileHeaderEncoder;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.coreedge.catchup.storecopy.GetStoreIdRequestHandler;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequest;
import org.neo4j.coreedge.catchup.storecopy.GetStoreRequestHandler;
import org.neo4j.coreedge.catchup.storecopy.StoreCopyFinishedResponseEncoder;
import org.neo4j.coreedge.catchup.tx.TxPullRequestDecoder;
import org.neo4j.coreedge.catchup.tx.TxPullRequestHandler;
import org.neo4j.coreedge.catchup.tx.TxPullResponseEncoder;
import org.neo4j.coreedge.catchup.tx.TxStreamFinishedResponseEncoder;
import org.neo4j.coreedge.core.state.CoreState;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotEncoder;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequest;
import org.neo4j.coreedge.core.state.snapshot.CoreSnapshotRequestHandler;
import org.neo4j.coreedge.identity.StoreId;
import org.neo4j.coreedge.logging.ExceptionLoggingHandler;
import org.neo4j.coreedge.messaging.address.ListenSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/catchup/CatchupServer.class */
public class CatchupServer extends LifecycleAdapter {
    private final LogProvider logProvider;
    private Monitors monitors;
    private final Supplier<StoreId> storeIdSupplier;
    private final Supplier<TransactionIdStore> transactionIdStoreSupplier;
    private final Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier;
    private final Supplier<NeoStoreDataSource> dataSourceSupplier;
    private final NamedThreadFactory threadFactory = new NamedThreadFactory("catchup-server");
    private final CoreState coreState;
    private final ListenSocketAddress listenAddress;
    private EventLoopGroup workerGroup;
    private Channel channel;
    private Supplier<CheckPointer> checkPointerSupplier;
    private Log log;

    public CatchupServer(LogProvider logProvider, Supplier<StoreId> supplier, Supplier<TransactionIdStore> supplier2, Supplier<LogicalTransactionStore> supplier3, Supplier<NeoStoreDataSource> supplier4, Supplier<CheckPointer> supplier5, CoreState coreState, ListenSocketAddress listenSocketAddress, Monitors monitors) {
        this.coreState = coreState;
        this.listenAddress = listenSocketAddress;
        this.transactionIdStoreSupplier = supplier2;
        this.storeIdSupplier = supplier;
        this.logicalTransactionStoreSupplier = supplier3;
        this.logProvider = logProvider;
        this.monitors = monitors;
        this.log = logProvider.getLog(getClass());
        this.dataSourceSupplier = supplier4;
        this.checkPointerSupplier = supplier5;
    }

    public synchronized void start() throws Throwable {
        this.workerGroup = new NioEventLoopGroup(0, this.threadFactory);
        this.channel = new ServerBootstrap().group(this.workerGroup).channel(NioServerSocketChannel.class).localAddress(this.listenAddress.socketAddress()).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.neo4j.coreedge.catchup.CatchupServer.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) throws Exception {
                CatchupServerProtocol catchupServerProtocol = new CatchupServerProtocol();
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)});
                pipeline.addLast(new ChannelHandler[]{new LengthFieldPrepender(4)});
                pipeline.addLast(new ChannelHandler[]{new ResponseMessageType.Encoder()});
                pipeline.addLast(new ChannelHandler[]{new RequestMessageType.Encoder()});
                pipeline.addLast(new ChannelHandler[]{new TxPullResponseEncoder()});
                pipeline.addLast(new ChannelHandler[]{new CoreSnapshotEncoder()});
                pipeline.addLast(new ChannelHandler[]{new StoreCopyFinishedResponseEncoder()});
                pipeline.addLast(new ChannelHandler[]{new TxStreamFinishedResponseEncoder()});
                pipeline.addLast(new ChannelHandler[]{new FileHeaderEncoder()});
                pipeline.addLast(new ChannelHandler[]{new ServerMessageTypeHandler(catchupServerProtocol, CatchupServer.this.logProvider)});
                pipeline.addLast(new ChannelHandler[]{CatchupServer.this.decoders(catchupServerProtocol)});
                Predicate predicate = message -> {
                    return message.version() == 0;
                };
                pipeline.addLast(new ChannelHandler[]{new TxPullRequestHandler(predicate, catchupServerProtocol, CatchupServer.this.storeIdSupplier, CatchupServer.this.transactionIdStoreSupplier, CatchupServer.this.logicalTransactionStoreSupplier, CatchupServer.this.monitors, CatchupServer.this.logProvider)});
                pipeline.addLast(new ChannelHandler[]{new ChunkedWriteHandler()});
                pipeline.addLast(new ChannelHandler[]{new GetStoreRequestHandler(predicate, catchupServerProtocol, CatchupServer.this.dataSourceSupplier, CatchupServer.this.checkPointerSupplier, CatchupServer.this.logProvider)});
                pipeline.addLast(new ChannelHandler[]{new GetStoreIdRequestHandler(predicate, catchupServerProtocol, CatchupServer.this.storeIdSupplier, CatchupServer.this.logProvider)});
                pipeline.addLast(new ChannelHandler[]{new CoreSnapshotRequestHandler(predicate, catchupServerProtocol, CatchupServer.this.coreState, CatchupServer.this.logProvider)});
                pipeline.addLast(new ChannelHandler[]{new ExceptionLoggingHandler(CatchupServer.this.log)});
            }
        }).bind().syncUninterruptibly().channel();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelInboundHandler decoders(CatchupServerProtocol catchupServerProtocol) {
        RequestDecoderDispatcher requestDecoderDispatcher = new RequestDecoderDispatcher(catchupServerProtocol, this.logProvider);
        requestDecoderDispatcher.register(CatchupServerProtocol.State.TX_PULL, new TxPullRequestDecoder());
        requestDecoderDispatcher.register(CatchupServerProtocol.State.GET_STORE, new SimpleRequestDecoder((v1) -> {
            return new GetStoreRequest(v1);
        }));
        requestDecoderDispatcher.register(CatchupServerProtocol.State.GET_STORE_ID, new SimpleRequestDecoder((v1) -> {
            return new GetStoreIdRequest(v1);
        }));
        requestDecoderDispatcher.register(CatchupServerProtocol.State.GET_RAFT_STATE, new SimpleRequestDecoder((v1) -> {
            return new CoreSnapshotRequest(v1);
        }));
        return requestDecoderDispatcher;
    }

    public synchronized void stop() throws Throwable {
        try {
            this.channel.close().sync();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.log.warn("Interrupted while closing channel.");
        }
        if (this.workerGroup.shutdownGracefully(2L, 5L, TimeUnit.SECONDS).awaitUninterruptibly(10L, TimeUnit.SECONDS)) {
            this.log.warn("Worker group not shutdown within 10 seconds.");
        }
    }
}
