package org.neo4j.coreedge.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.neo4j.coreedge.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.logging.Log;

/* loaded from: input_file:org/neo4j/coreedge/messaging/NonBlockingChannel.class */
public class NonBlockingChannel {
    private static final int CONNECT_BACKOFF_IN_MS = 250;
    private static final int RETRY_DELAY_MS = 100;
    private Channel nettyChannel;
    private Bootstrap bootstrap;
    private InetSocketAddress destination;
    private final MessageQueueMonitor monitor;
    private final int maxQueueSize;
    FutureListener<Void> errorListener;
    private Queue<Object> messageQueue = new ConcurrentLinkedQueue();
    private volatile boolean stillRunning = true;
    private final Thread messageSendingThread = new Thread(this::messageSendingThreadWork);

    public NonBlockingChannel(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, Log log, MessageQueueMonitor messageQueueMonitor, int i) {
        this.bootstrap = bootstrap;
        this.destination = inetSocketAddress;
        this.monitor = messageQueueMonitor;
        this.maxQueueSize = i;
        this.errorListener = future -> {
            if (future.isSuccess()) {
                return;
            }
            log.error("Failed to send message to " + inetSocketAddress, future.cause());
        };
        this.messageSendingThread.start();
    }

    private void messageSendingThreadWork() {
        while (this.stillRunning) {
            try {
                ensureConnected();
                if (sendMessages()) {
                    this.nettyChannel.flush();
                }
            } catch (IOException e) {
                if (this.nettyChannel != null) {
                    this.nettyChannel.close();
                    this.nettyChannel = null;
                }
            }
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
        }
        if (this.nettyChannel != null) {
            this.nettyChannel.close();
            this.messageQueue.clear();
            this.monitor.queueSize(this.destination, this.messageQueue.size());
        }
    }

    public void dispose() {
        this.stillRunning = false;
        while (this.messageSendingThread.isAlive()) {
            this.messageSendingThread.interrupt();
            try {
                this.messageSendingThread.join(100L);
            } catch (InterruptedException e) {
            }
        }
    }

    public void send(Object obj) {
        if (!this.stillRunning) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.messageQueue.size() >= this.maxQueueSize) {
            this.monitor.droppedMessage(this.destination);
            return;
        }
        this.messageQueue.offer(obj);
        LockSupport.unpark(this.messageSendingThread);
        this.monitor.queueSize(this.destination, this.messageQueue.size());
    }

    private boolean sendMessages() throws IOException {
        if (this.nettyChannel == null) {
            return false;
        }
        boolean z = false;
        while (true) {
            boolean z2 = z;
            Object peek = this.messageQueue.peek();
            if (peek == null) {
                return z2;
            }
            this.nettyChannel.write(peek).addListener(this.errorListener);
            this.messageQueue.poll();
            this.monitor.queueSize(this.destination, this.messageQueue.size());
            z = true;
        }
    }

    private void ensureConnected() throws IOException {
        if (this.nettyChannel != null && !this.nettyChannel.isOpen()) {
            this.nettyChannel = null;
        }
        while (this.nettyChannel == null && this.stillRunning) {
            ChannelFuture connect = this.bootstrap.connect(this.destination);
            Channel channel = connect.awaitUninterruptibly().channel();
            if (connect.isSuccess()) {
                channel.flush();
                this.nettyChannel = channel;
            } else {
                channel.close();
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(250L));
            }
        }
    }
}
