package org.neo4j.coreedge.catchup;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.time.Clock;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.neo4j.coreedge.discovery.NoKnownAddressesException;
import org.neo4j.coreedge.discovery.TopologyService;
import org.neo4j.coreedge.identity.MemberId;
import org.neo4j.coreedge.messaging.CatchUpRequest;
import org.neo4j.coreedge.messaging.address.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/catchup/CatchUpClient.class */
public class CatchUpClient extends LifecycleAdapter {
    private final LogProvider logProvider;
    private final TopologyService discoveryService;
    private final Clock clock;
    private final Map<AdvertisedSocketAddress, CatchUpChannel> idleChannels = new HashMap();
    private final Set<CatchUpChannel> activeChannels = new HashSet();
    private final Log log;
    private NioEventLoopGroup eventLoopGroup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/coreedge/catchup/CatchUpClient$CatchUpChannel.class */
    public class CatchUpChannel {
        private final TrackingResponseHandler handler;
        private final AdvertisedSocketAddress destination;
        private Channel nettyChannel;

        CatchUpChannel(AdvertisedSocketAddress advertisedSocketAddress) {
            this.destination = advertisedSocketAddress;
            this.handler = new TrackingResponseHandler(new CatchUpResponseAdaptor(), CatchUpClient.this.clock);
            this.nettyChannel = new Bootstrap().group(CatchUpClient.this.eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: org.neo4j.coreedge.catchup.CatchUpClient.CatchUpChannel.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    CatchUpClientChannelPipeline.initChannel(socketChannel, CatchUpChannel.this.handler, CatchUpClient.this.logProvider);
                }
            }).connect(advertisedSocketAddress.socketAddress()).awaitUninterruptibly().channel();
        }

        void setResponseHandler(CatchUpResponseCallback catchUpResponseCallback, CompletableFuture<?> completableFuture) {
            this.handler.setResponseHandler(catchUpResponseCallback, completableFuture);
        }

        void send(CatchUpRequest catchUpRequest) {
            this.nettyChannel.write(catchUpRequest.messageType());
            this.nettyChannel.writeAndFlush(catchUpRequest);
        }

        long millisSinceLastResponse() {
            return CatchUpClient.this.clock.millis() - this.handler.lastResponseTime();
        }

        void close() {
            this.nettyChannel.close();
        }
    }

    public CatchUpClient(TopologyService topologyService, LogProvider logProvider, Clock clock) {
        this.logProvider = logProvider;
        this.discoveryService = topologyService;
        this.log = logProvider.getLog(getClass());
        this.clock = clock;
    }

    public <T> T makeBlockingRequest(MemberId memberId, CatchUpRequest catchUpRequest, long j, TimeUnit timeUnit, CatchUpResponseCallback<T> catchUpResponseCallback) throws CatchUpClientException, NoKnownAddressesException {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        CatchUpChannel acquireChannel = acquireChannel(memberId);
        completableFuture.whenComplete((obj, th) -> {
            if (th == null) {
                release(acquireChannel);
            } else {
                dispose(acquireChannel);
            }
        });
        acquireChannel.setResponseHandler(catchUpResponseCallback, completableFuture);
        acquireChannel.send(catchUpRequest);
        String format = String.format("Timed out executing operation %s on %s", catchUpRequest, memberId);
        acquireChannel.getClass();
        return (T) TimeoutLoop.waitForCompletion(completableFuture, format, acquireChannel::millisSinceLastResponse, j, timeUnit);
    }

    private synchronized void dispose(CatchUpChannel catchUpChannel) {
        this.activeChannels.remove(catchUpChannel);
        catchUpChannel.close();
    }

    private synchronized void release(CatchUpChannel catchUpChannel) {
        this.activeChannels.remove(catchUpChannel);
        this.idleChannels.put(catchUpChannel.destination, catchUpChannel);
    }

    private synchronized CatchUpChannel acquireChannel(MemberId memberId) throws NoKnownAddressesException {
        AdvertisedSocketAddress catchupServer = this.discoveryService.currentTopology().coreAddresses(memberId).getCatchupServer();
        CatchUpChannel remove = this.idleChannels.remove(catchupServer);
        if (remove == null) {
            remove = new CatchUpChannel(catchupServer);
        }
        this.activeChannels.add(remove);
        return remove;
    }

    public void start() {
        this.eventLoopGroup = new NioEventLoopGroup(0, new NamedThreadFactory("catch-up-client"));
    }

    public void stop() throws Throwable {
        try {
            this.idleChannels.values().forEach((v0) -> {
                v0.close();
            });
            this.activeChannels.forEach((v0) -> {
                v0.close();
            });
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
        } catch (InterruptedException e) {
            this.log.warn("Interrupted while stopping catch up client.");
        }
    }
}
