package net.dempsy.transport.tcp.nio;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import net.dempsy.Infrastructure;
import net.dempsy.Manager;
import net.dempsy.monitoring.NodeStatsCollector;
import net.dempsy.serialization.Serializer;
import net.dempsy.transport.MessageTransportException;
import net.dempsy.transport.NodeAddress;
import net.dempsy.transport.SenderFactory;
import net.dempsy.transport.tcp.TcpAddress;
import net.dempsy.transport.tcp.nio.internal.NioUtils;
import net.dempsy.util.Functional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/dempsy/transport/tcp/nio/NioSenderFactory.class */
public class NioSenderFactory implements SenderFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(NioSenderFactory.class);
    public static final String CONFIG_KEY_SENDER_THREADS = "send_threads";
    public static final String DEFAULT_SENDER_THREADS = "2";
    public static final String CONFIG_KEY_SENDER_MAX_QUEUED = "send_max_queued";
    public static final String DEFAULT_SENDER_MAX_QUEUED = "1000";
    public static final String CONFIG_KEY_SENDER_TCP_MTU = "tcp_mtu";
    public static final String DEFAULT_SENDER_TCP_MTU = "1400";
    public static final String CONFIG_KEY_SENDER_STOP_TIMEOUT_MILLIS = "sender_stop_timeout_millis";
    public static final String DEFAULT_SENDER_STOP_TIMEOUT_MILLIS = "3000";
    NodeStatsCollector statsCollector;
    String nodeId;
    int maxNumberOfQueuedOutgoing;
    private Sending[] sendings;
    private Thread[] sendingsThreads;
    private final ConcurrentHashMap<TcpAddress, NioSender> senders = new ConcurrentHashMap<>();
    final ConcurrentHashMap<NioSender, NioSender> idleSenders = new ConcurrentHashMap<>();
    final Manager<Serializer> serializerManager = new Manager<>(Serializer.class);
    final AtomicBoolean isRunning = new AtomicBoolean(true);
    int mtu = Integer.parseInt(DEFAULT_SENDER_TCP_MTU);
    int stopTimeout = Integer.parseInt(DEFAULT_SENDER_STOP_TIMEOUT_MILLIS);
    private final AtomicBoolean sendingsRunning = new AtomicBoolean(true);

    /* loaded from: input_file:net/dempsy/transport/tcp/nio/NioSenderFactory$Sending.class */
    public static class Sending implements Runnable {
        final AtomicBoolean isRunning;
        final Selector selector;
        final String nodeId;
        final Map<NioSender, NioSender> idleSenders;
        final NodeStatsCollector statsCollector;

        Sending(AtomicBoolean atomicBoolean, String str, Map<NioSender, NioSender> map, NodeStatsCollector nodeStatsCollector) throws MessageTransportException {
            this.isRunning = atomicBoolean;
            this.nodeId = str;
            this.idleSenders = map;
            this.statsCollector = nodeStatsCollector;
            try {
                this.selector = Selector.open();
            } catch (IOException e) {
                throw new MessageTransportException(e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (this.isRunning.get()) {
                try {
                    try {
                    } catch (IOException e) {
                        NioSenderFactory.LOGGER.error(this.nodeId + " sender failed", e);
                    }
                    if (this.selector.selectNow() == 0) {
                        Set<SelectionKey> keys = this.selector.keys();
                        if (keys != null && keys.size() > 0) {
                            i = 0;
                            SenderHolder senderHolder = (SenderHolder) keys.stream().map(selectionKey -> {
                                return (SenderHolder) selectionKey.attachment();
                            }).filter(senderHolder2 -> {
                                return !senderHolder2.readyToWrite(true);
                            }).filter(senderHolder3 -> {
                                return senderHolder3.readyToSerialize();
                            }).findFirst().orElse(null);
                            if (senderHolder != null) {
                                senderHolder.trySerialize();
                            } else {
                                SelectionKey orElse = keys.stream().filter(selectionKey2 -> {
                                    return ((SenderHolder) selectionKey2.attachment()).shouldClose();
                                }).findAny().orElse(null);
                                if (orElse != null) {
                                    ((SenderHolder) orElse.attachment()).close(orElse);
                                }
                            }
                        } else if (checkForNewSenders()) {
                            i = 0;
                        } else {
                            i++;
                            if (i > 1000) {
                                NioUtils.dontInterrupt(() -> {
                                    Thread.sleep(1L);
                                }, interruptedException -> {
                                    if (this.isRunning.get()) {
                                        NioSenderFactory.LOGGER.error(this.nodeId + " sender interrupted", interruptedException);
                                    }
                                });
                            } else {
                                Thread.yield();
                            }
                        }
                    } else {
                        i = 0;
                        Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                        while (it.hasNext()) {
                            SelectionKey next = it.next();
                            it.remove();
                            if (next.isValid()) {
                                if (next.isWritable()) {
                                    SenderHolder senderHolder4 = (SenderHolder) next.attachment();
                                    if (senderHolder4.writeSomethingReturnDone(next, this.statsCollector)) {
                                        if (!senderHolder4.shouldClose()) {
                                            this.idleSenders.putIfAbsent(senderHolder4.sender, senderHolder4.sender);
                                            next.cancel();
                                        } else if (!senderHolder4.close(next)) {
                                            this.idleSenders.putIfAbsent(senderHolder4.sender, senderHolder4.sender);
                                            next.cancel();
                                        }
                                    }
                                }
                            }
                        }
                    }
                } finally {
                    if (this.selector != null) {
                        NioUtils.closeQuietly(this.selector, NioSenderFactory.LOGGER, "Failed to close selector on Sender thread.");
                    }
                }
            }
        }

        private boolean checkForNewSenders() throws IOException {
            boolean z = false;
            Set<NioSender> keySet = this.idleSenders.keySet();
            HashSet hashSet = new HashSet();
            try {
                keySet.stream().filter(nioSender -> {
                    return nioSender.messages.peek() != null;
                }).forEach(nioSender2 -> {
                    NioSender remove = this.idleSenders.remove(nioSender2);
                    if (remove != null) {
                        hashSet.add(remove);
                    }
                });
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    NioSender nioSender3 = (NioSender) it.next();
                    if (nioSender3.messages.peek() != null) {
                        new SenderHolder(nioSender3, NioSenderFactory.LOGGER).register(this.selector);
                        it.remove();
                        z = true;
                    }
                }
                return z;
            } finally {
                hashSet.forEach(nioSender4 -> {
                    this.idleSenders.putIfAbsent(nioSender4, nioSender4);
                });
            }
        }
    }

    public void close() {
        ArrayList arrayList;
        boolean z;
        LOGGER.trace(this.nodeId + " stopping " + NioSenderFactory.class.getSimpleName());
        synchronized (this) {
            this.isRunning.set(false);
            arrayList = new ArrayList(this.senders.values());
        }
        arrayList.forEach(nioSender -> {
            nioSender.stop();
        });
        synchronized (this) {
            z = this.senders.size() > 0;
        }
        if (z) {
            close();
        }
        this.sendingsRunning.set(false);
        ArrayList arrayList2 = new ArrayList(Arrays.asList(this.sendingsThreads));
        boolean z2 = false;
        long currentTimeMillis = System.currentTimeMillis();
        while (arrayList2.size() > 0 && !z2) {
            z2 = true;
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                if (((Thread) it.next()).isAlive()) {
                    z2 = false;
                } else {
                    it.remove();
                }
            }
            if (System.currentTimeMillis() - currentTimeMillis > this.stopTimeout) {
                LOGGER.warn("Stopping without having been able to stop all sending threads.");
                z2 = true;
            }
        }
    }

    public boolean isReady() {
        return true;
    }

    /* renamed from: getSender, reason: merged with bridge method [inline-methods] */
    public NioSender m38getSender(NodeAddress nodeAddress) throws MessageTransportException {
        TcpAddress tcpAddress = (TcpAddress) nodeAddress;
        if (!this.isRunning.get()) {
            throw new MessageTransportException(this.nodeId + " sender had getSender called while stopped.");
        }
        NioSender computeIfAbsent = this.senders.computeIfAbsent(tcpAddress, tcpAddress2 -> {
            return new NioSender(tcpAddress2, this);
        });
        try {
            computeIfAbsent.connect(false);
            return computeIfAbsent;
        } catch (IOException e) {
            throw new MessageTransportException(this.nodeId + " sender failed to connect to " + nodeAddress, e);
        }
    }

    public void start(Infrastructure infrastructure) {
        this.statsCollector = infrastructure.getNodeStatsCollector();
        this.nodeId = infrastructure.getNodeId();
        int parseInt = Integer.parseInt(infrastructure.getConfigValue(NioSender.class, CONFIG_KEY_SENDER_THREADS, DEFAULT_SENDER_THREADS));
        this.mtu = Integer.parseInt(infrastructure.getConfigValue(NioSender.class, CONFIG_KEY_SENDER_TCP_MTU, DEFAULT_SENDER_TCP_MTU));
        this.maxNumberOfQueuedOutgoing = Integer.parseInt(infrastructure.getConfigValue(NioSender.class, CONFIG_KEY_SENDER_MAX_QUEUED, DEFAULT_SENDER_MAX_QUEUED));
        this.stopTimeout = Integer.parseInt(infrastructure.getConfigValue(NioSender.class, CONFIG_KEY_SENDER_STOP_TIMEOUT_MILLIS, DEFAULT_SENDER_STOP_TIMEOUT_MILLIS));
        this.sendings = new Sending[parseInt];
        this.sendingsThreads = new Thread[parseInt];
        for (int i = 0; i < this.sendings.length; i++) {
            Sending sending = new Sending(this.sendingsRunning, this.nodeId, this.idleSenders, this.statsCollector);
            this.sendings[i] = sending;
            Thread thread = new Thread(sending, "nio-sender-" + i + "-" + this.nodeId);
            this.sendingsThreads[i] = thread;
            Functional.chain(thread, new Consumer[]{thread2 -> {
                thread2.start();
            }});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void imDone(TcpAddress tcpAddress) {
        this.senders.remove(tcpAddress);
    }
}
