package org.neo4j.coreedge.server;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.neo4j.coreedge.network.Message;
import org.neo4j.coreedge.raft.net.NonBlockingChannel;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.net.monitoring.MessageQueueMonitor;
import org.neo4j.coreedge.server.Expiration;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

/* loaded from: input_file:org/neo4j/coreedge/server/SenderService.class */
public class SenderService extends LifecycleAdapter implements Outbound<AdvertisedSocketAddress> {
    private final Expiration expiration;
    private final ExpiryScheduler scheduler;
    private final ChannelInitializer<SocketChannel> channelInitializer;
    private final Log log;
    private final Monitors monitors;
    private JobScheduler.JobHandle jobHandle;
    private boolean senderServiceRunning;
    private Bootstrap bootstrap;
    private NioEventLoopGroup eventLoopGroup;
    private int maxQueueSize;
    private final ConcurrentHashMap<AdvertisedSocketAddress, TimestampedNonBlockingChannel> lazyChannelMap = new ConcurrentHashMap<>();
    private final ReadWriteLock serviceLock = new ReentrantReadWriteLock();

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:org/neo4j/coreedge/server/SenderService$InboundKeepAliveHandler.class */
    public class InboundKeepAliveHandler extends ChannelInboundHandlerAdapter {
        private final Expiration.ExpirationTime expirationTime;

        public InboundKeepAliveHandler(Expiration.ExpirationTime expirationTime) {
            this.expirationTime = expirationTime;
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            this.expirationTime.renew();
            super.channelRead(channelHandlerContext, obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/coreedge/server/SenderService$TimestampedNonBlockingChannel.class */
    public final class TimestampedNonBlockingChannel {
        private final Expiration.ExpirationTime endOfLife;
        private NonBlockingChannel channel;

        public TimestampedNonBlockingChannel(Expiration.ExpirationTime expirationTime, NonBlockingChannel nonBlockingChannel) {
            this.endOfLife = expirationTime;
            this.channel = nonBlockingChannel;
        }

        public NonBlockingChannel get() {
            return this.channel;
        }

        public Expiration.ExpirationTime getEndOfLife() {
            return this.endOfLife;
        }
    }

    public SenderService(ExpiryScheduler expiryScheduler, Expiration expiration, ChannelInitializer<SocketChannel> channelInitializer, LogProvider logProvider, Monitors monitors, int i) {
        this.expiration = expiration;
        this.scheduler = expiryScheduler;
        this.channelInitializer = channelInitializer;
        this.log = logProvider.getLog(getClass());
        this.monitors = monitors;
        this.maxQueueSize = i;
    }

    @Override // org.neo4j.coreedge.raft.net.Outbound
    public void send(AdvertisedSocketAddress advertisedSocketAddress, Message... messageArr) {
        this.serviceLock.readLock().lock();
        try {
            if (this.senderServiceRunning) {
                MessageQueueMonitor messageQueueMonitor = (MessageQueueMonitor) this.monitors.newMonitor(MessageQueueMonitor.class, NonBlockingChannel.class, new String[0]);
                NonBlockingChannel nonBlockingChannel = getAndUpdateLife(advertisedSocketAddress, messageQueueMonitor).get();
                messageQueueMonitor.register(advertisedSocketAddress.socketAddress());
                for (Message message : messageArr) {
                    nonBlockingChannel.send(message);
                }
                this.serviceLock.readLock().unlock();
            }
        } finally {
            this.serviceLock.readLock().unlock();
        }
    }

    public int activeChannelCount() {
        return this.lazyChannelMap.size();
    }

    private TimestampedNonBlockingChannel getAndUpdateLife(AdvertisedSocketAddress advertisedSocketAddress, MessageQueueMonitor messageQueueMonitor) {
        TimestampedNonBlockingChannel timestampedNonBlockingChannel = this.lazyChannelMap.get(advertisedSocketAddress);
        if (timestampedNonBlockingChannel == null) {
            Expiration expiration = this.expiration;
            expiration.getClass();
            Expiration.ExpirationTime expirationTime = new Expiration.ExpirationTime();
            timestampedNonBlockingChannel = new TimestampedNonBlockingChannel(expirationTime, new NonBlockingChannel(this.bootstrap, advertisedSocketAddress.socketAddress(), new InboundKeepAliveHandler(expirationTime), this.log, messageQueueMonitor, this.maxQueueSize));
            TimestampedNonBlockingChannel putIfAbsent = this.lazyChannelMap.putIfAbsent(advertisedSocketAddress, timestampedNonBlockingChannel);
            if (putIfAbsent != null) {
                timestampedNonBlockingChannel.get().dispose();
                timestampedNonBlockingChannel = putIfAbsent;
            }
        }
        timestampedNonBlockingChannel.getEndOfLife().renew();
        return timestampedNonBlockingChannel;
    }

    public synchronized void start() {
        this.serviceLock.writeLock().lock();
        try {
            this.eventLoopGroup = new NioEventLoopGroup(0, new NamedThreadFactory("sender-service"));
            this.bootstrap = new Bootstrap().group(this.eventLoopGroup).channel(NioSocketChannel.class).handler(this.channelInitializer);
            if (this.scheduler != null) {
                this.jobHandle = this.scheduler.schedule(this::reapDeadChannels);
            }
            this.senderServiceRunning = true;
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    public synchronized void stop() {
        this.serviceLock.writeLock().lock();
        try {
            this.senderServiceRunning = false;
            if (this.jobHandle != null) {
                this.jobHandle.cancel(true);
                this.jobHandle = null;
            }
            Iterator<TimestampedNonBlockingChannel> it = this.lazyChannelMap.values().iterator();
            while (it.hasNext()) {
                it.next().get().dispose();
                it.remove();
            }
            try {
                this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
            } catch (InterruptedException e) {
                this.log.warn("Interrupted while stopping sender service.");
            }
        } finally {
            this.serviceLock.writeLock().unlock();
        }
    }

    private synchronized void reapDeadChannels() {
        Iterator<TimestampedNonBlockingChannel> it = this.lazyChannelMap.values().iterator();
        while (it.hasNext()) {
            TimestampedNonBlockingChannel next = it.next();
            this.serviceLock.writeLock().lock();
            try {
                if (next.getEndOfLife().expired()) {
                    next.get().dispose();
                    it.remove();
                }
            } finally {
                this.serviceLock.writeLock().unlock();
            }
        }
    }
}
