package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.collection.Pair;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler;

/* loaded from: input_file:org/neo4j/causalclustering/messaging/SenderService.class */
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress, Message> {
    private final ChannelInitializer channelInitializer;
    private final Log log;
    private JobScheduler.JobHandle jobHandle;
    private boolean senderServiceRunning;
    private Bootstrap bootstrap;
    private NioEventLoopGroup eventLoopGroup;
    private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();
    private ReconnectingChannels channels = new ReconnectingChannels();

    public SenderService(ChannelInitializer channelInitializer, LogProvider logProvider) {
        this.channelInitializer = channelInitializer;
        this.log = logProvider.getLog(getClass());
    }

    @Override // org.neo4j.causalclustering.messaging.Outbound
    public void send(AdvertisedSocketAddress advertisedSocketAddress, Message message, boolean z) {
        this.serviceLock.readLock().lock();
        try {
            if (this.senderServiceRunning) {
                Future<Void> writeAndFlush = channel(advertisedSocketAddress).writeAndFlush(message);
                this.serviceLock.readLock().unlock();
                if (z) {
                    try {
                        writeAndFlush.get();
                    } catch (InterruptedException e) {
                        this.log.info("Interrupted while sending", e);
                    } catch (ExecutionException e2) {
                        this.log.error("Exception while sending to: " + advertisedSocketAddress, e2);
                    }
                }
            }
        } finally {
            this.serviceLock.readLock().unlock();
        }
    }

    private Channel channel(AdvertisedSocketAddress advertisedSocketAddress) {
        ReconnectingChannel reconnectingChannel = this.channels.get(advertisedSocketAddress);
        if (reconnectingChannel == null) {
            reconnectingChannel = new ReconnectingChannel(this.bootstrap, this.eventLoopGroup.next(), advertisedSocketAddress, this.log);
            reconnectingChannel.start();
            ReconnectingChannel putIfAbsent = this.channels.putIfAbsent(advertisedSocketAddress, reconnectingChannel);
            if (putIfAbsent != null) {
                reconnectingChannel.dispose();
                reconnectingChannel = putIfAbsent;
            } else {
                this.log.info("Creating channel to: [%s] ", new Object[]{advertisedSocketAddress});
            }
        }
        return reconnectingChannel;
    }

    public synchronized void start() {
        this.serviceLock.writeLock().lock();
        try {
            this.eventLoopGroup = new NioEventLoopGroup(0, new NamedThreadFactory("sender-service"));
            this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).handler(this.channelInitializer);
            this.senderServiceRunning = true;
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public synchronized void stop() {
        this.serviceLock.writeLock().lock();
        try {
            this.senderServiceRunning = false;
            if (this.jobHandle != null) {
                this.jobHandle.cancel(true);
                this.jobHandle = null;
            }
            Iterator<ReconnectingChannel> it = this.channels.values().iterator();
            while (it.hasNext()) {
                it.next().dispose();
                it.remove();
            }
            try {
                this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
            } catch (InterruptedException e) {
                this.log.warn("Interrupted while stopping sender service.");
            }
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public Stream<Pair<AdvertisedSocketAddress, ProtocolStack>> installedProtocols() {
        return this.channels.installedProtocols();
    }
}
