package io.hekate.cluster.internal.gossip;

import io.hekate.cluster.ClusterAddress;
import io.hekate.cluster.ClusterNodeId;
import io.hekate.cluster.internal.gossip.GossipProtocol;
import io.hekate.network.NetworkClient;
import io.hekate.network.NetworkClientCallback;
import io.hekate.network.NetworkConnector;
import io.hekate.network.NetworkEndpoint;
import io.hekate.network.NetworkFuture;
import io.hekate.network.NetworkMessage;
import io.hekate.network.NetworkServerHandler;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.format.ToString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/cluster/internal/gossip/GossipCommManager.class */
public class GossipCommManager implements NetworkServerHandler<GossipProtocol> {
    private static final Logger log;
    private static final boolean DEBUG;
    private static final boolean TRACE;
    private final Object mux = new Object();
    private final Map<ClusterNodeId, EndpointHolder> clients = new HashMap();
    private final Callback callback;
    private final NetworkConnector<GossipProtocol> net;
    private final NetworkClientCallback<GossipProtocol> netClientCallback;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/hekate/cluster/internal/gossip/GossipCommManager$Callback.class */
    public interface Callback {
        void onReceive(GossipProtocol gossipProtocol);

        void onSendSuccess(GossipProtocol gossipProtocol);

        void onSendFailure(GossipProtocol gossipProtocol, Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/cluster/internal/gossip/GossipCommManager$EndpointHolder.class */
    public static class EndpointHolder {
        private final boolean outbound;
        private final NetworkEndpoint<GossipProtocol> endpoint;

        public EndpointHolder(NetworkEndpoint<GossipProtocol> networkEndpoint, boolean z) {
            this.endpoint = networkEndpoint;
            this.outbound = z;
        }

        public NetworkEndpoint<GossipProtocol> endpoint() {
            return this.endpoint;
        }

        public boolean isOutbound() {
            return this.outbound;
        }

        public String toString() {
            return ToString.format(this);
        }
    }

    public GossipCommManager(NetworkConnector<GossipProtocol> networkConnector, final Callback callback) {
        if (!$assertionsDisabled && networkConnector == null) {
            throw new AssertionError("Network connector is null.");
        }
        if (!$assertionsDisabled && callback == null) {
            throw new AssertionError("Callback is null.");
        }
        this.net = networkConnector;
        this.callback = callback;
        this.netClientCallback = new NetworkClientCallback<GossipProtocol>() { // from class: io.hekate.cluster.internal.gossip.GossipCommManager.1
            @Override // io.hekate.network.NetworkClientCallback
            public void onMessage(NetworkMessage<GossipProtocol> networkMessage, NetworkClient<GossipProtocol> networkClient) throws IOException {
                GossipProtocol decode = networkMessage.decode();
                if (GossipCommManager.TRACE) {
                    GossipCommManager.log.trace("Received message via outbound connection [message={}]", decode);
                }
                callback.onReceive(decode);
            }

            @Override // io.hekate.network.NetworkClientCallback
            public void onDisconnect(NetworkClient<GossipProtocol> networkClient, Optional<Throwable> optional) {
                ClusterNodeId clusterNodeId = (ClusterNodeId) networkClient.getContext();
                if (clusterNodeId != null) {
                    if (GossipCommManager.DEBUG) {
                        GossipCommManager.log.debug("Closed outbound connection [to={}]", clusterNodeId);
                    }
                    synchronized (GossipCommManager.this.mux) {
                        EndpointHolder endpointHolder = (EndpointHolder) GossipCommManager.this.clients.get(clusterNodeId);
                        if (endpointHolder != null && endpointHolder.endpoint() == networkClient) {
                            if (GossipCommManager.DEBUG) {
                                GossipCommManager.log.debug("Removing outbound connection from registry [to={}]", clusterNodeId);
                            }
                            GossipCommManager.this.clients.remove(clusterNodeId);
                        }
                    }
                }
            }
        };
    }

    public void send(GossipProtocol.GossipMessage gossipMessage, Runnable runnable) {
        Callback callback;
        EndpointHolder endpointHolder;
        ClusterAddress clusterAddress = gossipMessage.to();
        ClusterNodeId id = clusterAddress.id();
        boolean z = false;
        synchronized (this.mux) {
            callback = this.callback;
            endpointHolder = this.clients.get(id);
            if (endpointHolder == null) {
                z = true;
                NetworkClient<GossipProtocol> newClient = this.net.newClient();
                newClient.setContext(id);
                endpointHolder = new EndpointHolder(newClient, true);
                this.clients.put(id, endpointHolder);
                newClient.connect(clusterAddress.socket(), new GossipProtocol.Connect(gossipMessage.from().id()), this.netClientCallback);
            }
        }
        if (z && DEBUG) {
            log.debug("Created a new outbound connection [to={}]", clusterAddress);
        }
        if (TRACE) {
            log.trace("Sending message [outboundConnection={}, message={}]", Boolean.valueOf(endpointHolder.isOutbound()), gossipMessage);
        }
        endpointHolder.endpoint().send(gossipMessage, (gossipProtocol, optional, networkEndpoint) -> {
            if (!optional.isPresent()) {
                callback.onSendSuccess(gossipProtocol);
                if (runnable != null) {
                    runnable.run();
                    return;
                }
                return;
            }
            if (TRACE) {
                log.trace("Failed to send a message [reason={}, message={}]", optional.get(), gossipProtocol);
            }
            callback.onSendFailure(gossipProtocol, (Throwable) optional.get());
            if (runnable != null) {
                runnable.run();
            }
        });
    }

    public void sendAndDisconnect(final GossipProtocol gossipProtocol, final Runnable runnable) {
        this.net.newClient().connect(gossipProtocol.toAddress(), gossipProtocol, new NetworkClientCallback<GossipProtocol>() { // from class: io.hekate.cluster.internal.gossip.GossipCommManager.2
            @Override // io.hekate.network.NetworkClientCallback
            public void onMessage(NetworkMessage<GossipProtocol> networkMessage, NetworkClient<GossipProtocol> networkClient) {
            }

            @Override // io.hekate.network.NetworkClientCallback
            public void onConnect(NetworkClient<GossipProtocol> networkClient) {
                networkClient.disconnect();
                GossipCommManager.this.callback.onSendSuccess(gossipProtocol);
                if (runnable != null) {
                    runnable.run();
                }
            }

            @Override // io.hekate.network.NetworkClientCallback
            public void onDisconnect(NetworkClient<GossipProtocol> networkClient, Optional<Throwable> optional) {
                GossipProtocol gossipProtocol2 = gossipProtocol;
                Runnable runnable2 = runnable;
                optional.ifPresent(th -> {
                    GossipCommManager.this.callback.onSendFailure(gossipProtocol2, th);
                    if (runnable2 != null) {
                        runnable2.run();
                    }
                });
            }
        });
    }

    @Override // io.hekate.network.NetworkServerHandler
    public void onConnect(GossipProtocol gossipProtocol, NetworkEndpoint<GossipProtocol> networkEndpoint) {
        if (gossipProtocol.type() != GossipProtocol.Type.CONNECT) {
            networkEndpoint.disconnect();
            this.callback.onReceive(gossipProtocol);
            return;
        }
        ClusterNodeId nodeId = ((GossipProtocol.Connect) gossipProtocol).nodeId();
        if (nodeId == null) {
            if (DEBUG) {
                log.debug("Rejecting connection without a cluster node id [message={}, connection={}]", gossipProtocol, networkEndpoint);
            }
            networkEndpoint.disconnect();
            return;
        }
        if (DEBUG) {
            log.debug("Got a new inbound connection [from={}]", nodeId);
        }
        networkEndpoint.setContext(nodeId);
        synchronized (this.mux) {
            if (!this.clients.containsKey(nodeId)) {
                if (DEBUG) {
                    log.debug("Registering inbound connection [from={}]", nodeId);
                }
                this.clients.put(nodeId, new EndpointHolder(networkEndpoint, false));
            } else if (DEBUG) {
                log.debug("Will not register inbound connection since another connection exists [from={}]", nodeId);
            }
        }
    }

    @Override // io.hekate.network.NetworkServerHandler
    public void onMessage(NetworkMessage<GossipProtocol> networkMessage, NetworkEndpoint<GossipProtocol> networkEndpoint) throws IOException {
        GossipProtocol decode = networkMessage.decode();
        if (TRACE) {
            log.trace("Received message via inbound connection [message={}]", decode);
        }
        this.callback.onReceive(decode);
    }

    @Override // io.hekate.network.NetworkServerHandler
    public void onDisconnect(NetworkEndpoint<GossipProtocol> networkEndpoint) {
        ClusterNodeId clusterNodeId = (ClusterNodeId) networkEndpoint.getContext();
        if (clusterNodeId != null) {
            if (DEBUG) {
                log.debug("Closed inbound connection [from={}]", clusterNodeId);
            }
            synchronized (this.mux) {
                EndpointHolder endpointHolder = this.clients.get(clusterNodeId);
                if (endpointHolder != null && endpointHolder.endpoint() == networkEndpoint) {
                    if (DEBUG) {
                        log.debug("Removing inbound connection from registry [from={}]", clusterNodeId);
                    }
                    this.clients.remove(clusterNodeId);
                }
            }
        }
    }

    public void stop() {
        ArrayList arrayList;
        synchronized (this.mux) {
            arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList(this.clients.values());
            this.clients.clear();
            if (!arrayList2.isEmpty()) {
                if (DEBUG) {
                    log.debug("Closing connections  [size={}]", Integer.valueOf(arrayList2.size()));
                }
                arrayList2.stream().filter((v0) -> {
                    return v0.isOutbound();
                }).forEach(endpointHolder -> {
                    arrayList.add(endpointHolder.endpoint().disconnect());
                });
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                AsyncUtils.getUninterruptedly((NetworkFuture) it.next());
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (!(cause instanceof IOException)) {
                    log.warn("Failed to close network connection.", cause);
                } else if (DEBUG) {
                    log.debug("Failed to close network connection due to an I/O error [cause={}]", e.toString());
                }
            }
        }
    }

    static {
        $assertionsDisabled = !GossipCommManager.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(GossipCommManager.class);
        DEBUG = log.isDebugEnabled();
        TRACE = log.isDebugEnabled();
    }
}
