package com.foilen.smalltools.net.netty;

import com.foilen.smalltools.tools.AssertTools;
import com.foilen.smalltools.tools.CloseableTools;
import com.foilen.smalltools.tools.ThreadTools;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/foilen/smalltools/net/netty/NettyClientMessagingQueue.class */
public class NettyClientMessagingQueue extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientMessagingQueue.class);
    private static final ConcurrentHashMap<Channel, NettyClientMessagingQueue> messagingQueueByChannel = new ConcurrentHashMap<>();
    private static boolean cleanupTaskStarted = false;
    private NettyClient nettyClient;
    private LinkedBlockingDeque<Object> msgQueue;
    private AtomicBoolean closeRequested;

    public static NettyClientMessagingQueue getInstance(Channel channel) {
        if (channel == null) {
            return null;
        }
        NettyClientMessagingQueue nettyClientMessagingQueue = messagingQueueByChannel.get(channel);
        if (nettyClientMessagingQueue == null) {
            nettyClientMessagingQueue = getNewInstance(channel);
        }
        return nettyClientMessagingQueue;
    }

    public static NettyClientMessagingQueue getInstance(NettyClient nettyClient) {
        if (nettyClient == null) {
            return null;
        }
        return getInstance(nettyClient.channel);
    }

    private static synchronized NettyClientMessagingQueue getNewInstance(Channel channel) {
        NettyClientMessagingQueue nettyClientMessagingQueue = messagingQueueByChannel.get(channel);
        if (nettyClientMessagingQueue == null) {
            NettyClient nettyClient = new NettyClient(channel);
            nettyClientMessagingQueue = new NettyClientMessagingQueue(nettyClient);
            logger.info("[{}] New messaging queue", nettyClient.getPeerIp());
            messagingQueueByChannel.put(channel, nettyClientMessagingQueue);
            if (!cleanupTaskStarted) {
                cleanupTaskStarted = true;
                Thread thread = new Thread(() -> {
                    while (true) {
                        ThreadTools.sleep(300000L);
                        logger.debug("Checking for stopped messaging queues");
                        Iterator<NettyClientMessagingQueue> it = messagingQueueByChannel.values().iterator();
                        while (it.hasNext()) {
                            NettyClientMessagingQueue next = it.next();
                            if (!next.isConnected()) {
                                next.close();
                                it.remove();
                            }
                        }
                    }
                }, "NettyClientMessagingQueue - Cleanup");
                thread.setDaemon(true);
                thread.start();
            }
        }
        return nettyClientMessagingQueue;
    }

    private NettyClientMessagingQueue(NettyClient nettyClient) {
        super("Messaging-Queue " + nettyClient.getPeerIp());
        this.msgQueue = new LinkedBlockingDeque<>(1000);
        this.closeRequested = new AtomicBoolean();
        AssertTools.assertTrue(nettyClient.isConnected(), "The netty client is not connected");
        setDaemon(true);
        this.nettyClient = nettyClient;
        start();
    }

    public void close() {
        this.closeRequested.set(true);
        interrupt();
    }

    public NettyClient getNettyClient() {
        return this.nettyClient;
    }

    public boolean isConnected() {
        return this.nettyClient.isConnected();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.closeRequested.get() && this.nettyClient.isConnected()) {
            Object obj = null;
            try {
                obj = this.msgQueue.take();
                this.nettyClient.writeFlushWait(obj);
            } catch (InterruptedException e) {
                logger.debug("Interrupted");
            } catch (ChannelException e2) {
                logger.warn("Got an io exception while sending a message. Will retry", e2);
                if (obj != null) {
                    this.msgQueue.addFirst(obj);
                }
                ThreadTools.sleep(5000L);
            } catch (Exception e3) {
                logger.error("Got an exception while sending a message", e3);
            }
        }
        CloseableTools.close(this.nettyClient);
    }

    public void send(Object obj) {
        try {
            this.msgQueue.put(obj);
        } catch (InterruptedException e) {
            logger.error("Was interrupted while placing a message on the queue");
        }
    }

    public void setNettyClient(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }
}
