package io.hekate.messaging.internal;

import io.hekate.cluster.ClusterNode;
import io.hekate.cluster.ClusterNodeId;
import io.hekate.cluster.ClusterTopology;
import io.hekate.cluster.ClusterView;
import io.hekate.cluster.health.DefaultFailureDetectorConfig;
import io.hekate.codec.CodecException;
import io.hekate.core.Hekate;
import io.hekate.core.HekateException;
import io.hekate.core.HekateSupport;
import io.hekate.failover.FailoverPolicy;
import io.hekate.failover.FailoverRoutingPolicy;
import io.hekate.failover.FailureInfo;
import io.hekate.failover.FailureResolution;
import io.hekate.failover.internal.DefaultFailoverContext;
import io.hekate.messaging.MessageInterceptor;
import io.hekate.messaging.MessageQueueOverflowException;
import io.hekate.messaging.MessageQueueTimeoutException;
import io.hekate.messaging.MessageReceiver;
import io.hekate.messaging.MessageTimeoutException;
import io.hekate.messaging.MessagingChannelClosedException;
import io.hekate.messaging.MessagingChannelId;
import io.hekate.messaging.MessagingEndpoint;
import io.hekate.messaging.MessagingException;
import io.hekate.messaging.broadcast.AggregateCallback;
import io.hekate.messaging.broadcast.AggregateFuture;
import io.hekate.messaging.broadcast.AggregateResult;
import io.hekate.messaging.broadcast.BroadcastCallback;
import io.hekate.messaging.broadcast.BroadcastFuture;
import io.hekate.messaging.broadcast.BroadcastResult;
import io.hekate.messaging.loadbalance.DefaultLoadBalancer;
import io.hekate.messaging.loadbalance.EmptyTopologyException;
import io.hekate.messaging.loadbalance.LoadBalancer;
import io.hekate.messaging.loadbalance.UnknownRouteException;
import io.hekate.messaging.unicast.FailureResponse;
import io.hekate.messaging.unicast.RejectedReplyException;
import io.hekate.messaging.unicast.ReplyDecision;
import io.hekate.messaging.unicast.Response;
import io.hekate.messaging.unicast.ResponseCallback;
import io.hekate.messaging.unicast.ResponseFuture;
import io.hekate.messaging.unicast.SendCallback;
import io.hekate.messaging.unicast.SendFuture;
import io.hekate.messaging.unicast.StreamFuture;
import io.hekate.network.NetworkConnector;
import io.hekate.partition.RendezvousHashMapper;
import io.hekate.util.async.Waiting;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway.class */
public class MessagingGateway<T> implements HekateSupport {
    private final String name;
    private final Class<T> baseType;

    @ToStringIgnore
    private final Logger log;

    @ToStringIgnore
    private final boolean debug;

    @ToStringIgnore
    private final MessagingChannelId id;

    @ToStringIgnore
    private final ClusterNode localNode;

    @ToStringIgnore
    private final HekateSupport hekate;

    @ToStringIgnore
    private final NetworkConnector<MessagingProtocol> net;

    @ToStringIgnore
    private final ClusterView cluster;

    @ToStringIgnore
    private final MessageReceiver<T> receiver;

    @ToStringIgnore
    private final CloseCallback onBeforeClose;

    @ToStringIgnore
    private final boolean checkIdle;

    @ToStringIgnore
    private final MessagingExecutor async;

    @ToStringIgnore
    private final MetricsCallback metrics;

    @ToStringIgnore
    private final ReceivePressureGuard receivePressure;

    @ToStringIgnore
    private final SendPressureGuard sendPressure;

    @ToStringIgnore
    private final int nioThreads;

    @ToStringIgnore
    private final MessageInterceptor<T> interceptor;

    @ToStringIgnore
    private final DefaultMessagingChannel<T> channel;

    @ToStringIgnore
    private ClusterTopology clientsTopology;

    @ToStringIgnore
    private boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    @ToStringIgnore
    private final Set<MessagingConnectionNetIn<T>> inbound = new HashSet();

    @ToStringIgnore
    private final StampedLock lock = new StampedLock();

    @ToStringIgnore
    private final Map<ClusterNodeId, MessagingClient<T>> clients = new HashMap();

    /* renamed from: io.hekate.messaging.internal.MessagingGateway$3, reason: invalid class name */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$io$hekate$failover$FailoverRoutingPolicy = new int[FailoverRoutingPolicy.values().length];

        static {
            try {
                $SwitchMap$io$hekate$failover$FailoverRoutingPolicy[FailoverRoutingPolicy.RETRY_SAME_NODE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$hekate$failover$FailoverRoutingPolicy[FailoverRoutingPolicy.PREFER_SAME_NODE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$hekate$failover$FailoverRoutingPolicy[FailoverRoutingPolicy.RE_ROUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$AggregateCallbackFuture.class */
    public static class AggregateCallbackFuture<T> extends AggregateFuture<T> implements AggregateCallback<T> {
        private AggregateCallbackFuture() {
        }

        @Override // io.hekate.messaging.broadcast.AggregateCallback
        public void onComplete(Throwable th, AggregateResult<T> aggregateResult) {
            if (th == null) {
                complete(aggregateResult);
            } else {
                completeExceptionally(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$BroadcastCallbackFuture.class */
    public static class BroadcastCallbackFuture<T> extends BroadcastFuture<T> implements BroadcastCallback<T> {
        private BroadcastCallbackFuture() {
        }

        @Override // io.hekate.messaging.broadcast.BroadcastCallback
        public void onComplete(Throwable th, BroadcastResult<T> broadcastResult) {
            if (th == null) {
                complete(broadcastResult);
            } else {
                completeExceptionally(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$ClientSelectionRejectedException.class */
    public static class ClientSelectionRejectedException extends Exception {
        private static final long serialVersionUID = 1;

        public ClientSelectionRejectedException(Throwable th) {
            super(null, th, false, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$CloseCallback.class */
    public interface CloseCallback {
        void onBeforeClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$FailoverCallback.class */
    public interface FailoverCallback {
        void retry(FailoverRoutingPolicy failoverRoutingPolicy, FailureInfo failureInfo);

        void fail(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$ResponseCallbackFuture.class */
    public static class ResponseCallbackFuture<T> extends ResponseFuture<T> implements ResponseCallback<T> {
        private ResponseCallbackFuture() {
        }

        @Override // io.hekate.messaging.unicast.ResponseCallback
        public void onComplete(Throwable th, Response<T> response) {
            if (th != null) {
                completeExceptionally(th);
            } else {
                if (response.isPartial()) {
                    return;
                }
                complete(response);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$SendCallbackFuture.class */
    public static class SendCallbackFuture extends SendFuture implements SendCallback {
        private SendCallbackFuture() {
        }

        @Override // io.hekate.messaging.unicast.SendCallback
        public void onComplete(Throwable th) {
            if (th == null) {
                complete(null);
            } else {
                completeExceptionally(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/messaging/internal/MessagingGateway$StreamCallbackFuture.class */
    public static class StreamCallbackFuture<T> extends StreamFuture<T> implements ResponseCallback<T> {
        private List<T> result;

        private StreamCallbackFuture() {
        }

        @Override // io.hekate.messaging.unicast.ResponseCallback
        public void onComplete(Throwable th, Response<T> response) {
            if (th != null) {
                completeExceptionally(th);
                return;
            }
            if (this.result == null) {
                this.result = new ArrayList();
            }
            this.result.add(response.get());
            if (response.isPartial()) {
                return;
            }
            complete(this.result);
        }
    }

    public MessagingGateway(String str, HekateSupport hekateSupport, Class<T> cls, NetworkConnector<MessagingProtocol> networkConnector, ClusterNode clusterNode, ClusterView clusterView, MessageReceiver<T> messageReceiver, int i, MessagingExecutor messagingExecutor, MetricsCallback metricsCallback, ReceivePressureGuard receivePressureGuard, SendPressureGuard sendPressureGuard, FailoverPolicy failoverPolicy, long j, LoadBalancer<T> loadBalancer, MessageInterceptor<T> messageInterceptor, int i2, int i3, Logger logger, boolean z, CloseCallback closeCallback) {
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Name is null.");
        }
        if (!$assertionsDisabled && cls == null) {
            throw new AssertionError("Base type is null.");
        }
        if (!$assertionsDisabled && networkConnector == null) {
            throw new AssertionError("Network connector is null.");
        }
        if (!$assertionsDisabled && clusterNode == null) {
            throw new AssertionError("Local cluster node is null.");
        }
        if (!$assertionsDisabled && clusterView == null) {
            throw new AssertionError("Cluster view is null.");
        }
        if (!$assertionsDisabled && messagingExecutor == null) {
            throw new AssertionError("Executor is null.");
        }
        this.id = new MessagingChannelId();
        this.name = str;
        this.baseType = cls;
        this.hekate = hekateSupport;
        this.net = networkConnector;
        this.localNode = clusterNode;
        this.cluster = clusterView;
        this.receiver = messageReceiver;
        this.interceptor = messageInterceptor;
        this.nioThreads = i;
        this.async = messagingExecutor;
        this.metrics = metricsCallback;
        this.receivePressure = receivePressureGuard;
        this.sendPressure = sendPressureGuard;
        this.checkIdle = z;
        this.log = logger;
        this.debug = logger.isDebugEnabled();
        this.onBeforeClose = closeCallback;
        this.channel = new DefaultMessagingChannel<>(this, clusterView, RendezvousHashMapper.of(clusterView).withPartitions(i2).withBackupNodes(i3).build(), loadBalancer == null ? new DefaultLoadBalancer() : loadBalancer, failoverPolicy, j, null);
    }

    public MessagingChannelId id() {
        return this.id;
    }

    public String name() {
        return this.name;
    }

    public int nioThreads() {
        return this.nioThreads;
    }

    public int workerThreads() {
        return this.async.poolSize();
    }

    public ClusterNode localNode() {
        return this.localNode;
    }

    public MessageInterceptor<T> interceptor() {
        return this.interceptor;
    }

    public Logger log() {
        return this.log;
    }

    public SendFuture send(Object obj, T t, MessagingOpts<T> messagingOpts) {
        SendCallbackFuture sendCallbackFuture = new SendCallbackFuture();
        send(obj, t, messagingOpts, sendCallbackFuture);
        return sendCallbackFuture;
    }

    public void send(Object obj, T t, MessagingOpts<T> messagingOpts, SendCallback sendCallback) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Message must be not null.");
        }
        if (!$assertionsDisabled && messagingOpts == null) {
            throw new AssertionError("Messaging options must be not null.");
        }
        checkMessageType(t);
        MessageContext<T> newContext = newContext(obj, t, messagingOpts);
        try {
            long backPressureAcquire = backPressureAcquire(newContext.opts().timeout(), t);
            if (backPressureAcquire > 0) {
                doScheduleTimeout(backPressureAcquire, newContext, sendCallback);
            }
            routeAndSend(newContext, sendCallback, null);
        } catch (MessageQueueOverflowException | MessageQueueTimeoutException | InterruptedException e) {
            notifyOnErrorAsync(newContext, sendCallback, e);
        }
    }

    public ResponseFuture<T> request(Object obj, T t, MessagingOpts<T> messagingOpts) {
        ResponseCallbackFuture responseCallbackFuture = new ResponseCallbackFuture();
        request(obj, t, messagingOpts, responseCallbackFuture);
        return responseCallbackFuture;
    }

    public void request(Object obj, T t, MessagingOpts<T> messagingOpts, ResponseCallback<T> responseCallback) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Request message must be not null.");
        }
        if (!$assertionsDisabled && messagingOpts == null) {
            throw new AssertionError("Messaging options must be not null.");
        }
        if (!$assertionsDisabled && responseCallback == null) {
            throw new AssertionError("Callback must be not null.");
        }
        checkMessageType(t);
        requestAsync(t, newContext(obj, t, messagingOpts), responseCallback);
    }

    public StreamFuture<T> stream(Object obj, T t, MessagingOpts<T> messagingOpts) {
        StreamCallbackFuture streamCallbackFuture = new StreamCallbackFuture();
        stream(obj, t, messagingOpts, streamCallbackFuture);
        return streamCallbackFuture;
    }

    public void stream(Object obj, T t, MessagingOpts<T> messagingOpts, ResponseCallback<T> responseCallback) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Request message must be not null.");
        }
        if (!$assertionsDisabled && messagingOpts == null) {
            throw new AssertionError("Messaging options must be not null.");
        }
        if (!$assertionsDisabled && responseCallback == null) {
            throw new AssertionError("Callback must be not null.");
        }
        checkMessageType(t);
        requestAsync(t, newContext(obj, t, messagingOpts, true), responseCallback);
    }

    public BroadcastFuture<T> broadcast(Object obj, T t, MessagingOpts<T> messagingOpts) {
        BroadcastCallbackFuture broadcastCallbackFuture = new BroadcastCallbackFuture();
        broadcast(obj, t, messagingOpts, broadcastCallbackFuture);
        return broadcastCallbackFuture;
    }

    public void broadcast(Object obj, T t, MessagingOpts<T> messagingOpts, BroadcastCallback<T> broadcastCallback) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Message must be not null.");
        }
        if (!$assertionsDisabled && messagingOpts == null) {
            throw new AssertionError("Messaging options must be not null.");
        }
        if (!$assertionsDisabled && broadcastCallback == null) {
            throw new AssertionError("Callback must be not null.");
        }
        checkMessageType(t);
        Set<ClusterNode> nodeSet = messagingOpts.cluster().topology().nodeSet();
        if (nodeSet.isEmpty()) {
            broadcastCallback.onComplete(null, new EmptyBroadcastResult(t));
            return;
        }
        BroadcastContext broadcastContext = new BroadcastContext(t, nodeSet, broadcastCallback);
        for (ClusterNode clusterNode : nodeSet) {
            send(obj, t, messagingOpts.forSingleNode(clusterNode), th -> {
                if (th == null ? broadcastContext.onSendSuccess(clusterNode) : th instanceof UnknownRouteException ? broadcastContext.forgetNode(clusterNode) : broadcastContext.onSendFailure(clusterNode, th)) {
                    broadcastContext.complete();
                }
            });
        }
    }

    public AggregateFuture<T> aggregate(Object obj, T t, MessagingOpts<T> messagingOpts) {
        AggregateCallbackFuture aggregateCallbackFuture = new AggregateCallbackFuture();
        aggregate(obj, t, messagingOpts, aggregateCallbackFuture);
        return aggregateCallbackFuture;
    }

    public void aggregate(Object obj, T t, MessagingOpts<T> messagingOpts, AggregateCallback<T> aggregateCallback) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Message must be not null.");
        }
        if (!$assertionsDisabled && messagingOpts == null) {
            throw new AssertionError("Messaging options must be not null.");
        }
        if (!$assertionsDisabled && aggregateCallback == null) {
            throw new AssertionError("Callback must be not null.");
        }
        checkMessageType(t);
        List<ClusterNode> nodes = messagingOpts.cluster().topology().nodes();
        if (nodes.isEmpty()) {
            aggregateCallback.onComplete(null, new EmptyAggregateResult(t));
            return;
        }
        AggregateContext aggregateContext = new AggregateContext(t, nodes, aggregateCallback);
        for (ClusterNode clusterNode : nodes) {
            request(obj, t, messagingOpts.forSingleNode(clusterNode), (th, response) -> {
                if (th == null ? aggregateContext.onReplySuccess(clusterNode, response) : th instanceof UnknownRouteException ? aggregateContext.forgetNode(clusterNode) : aggregateContext.onReplyFailure(clusterNode, th)) {
                    aggregateContext.complete();
                }
            });
        }
    }

    public Waiting close() {
        ArrayList arrayList;
        if (this.onBeforeClose != null) {
            this.onBeforeClose.onBeforeClose();
        }
        long writeLock = this.lock.writeLock();
        try {
            if (this.closed) {
                Waiting waiting = Waiting.NO_WAIT;
                this.lock.unlockWrite(writeLock);
                return waiting;
            }
            if (this.debug) {
                this.log.debug("Closing channel [name={}]", this.name);
            }
            this.closed = true;
            this.clientsTopology = null;
            if (this.sendPressure != null) {
                this.sendPressure.terminate();
            }
            LinkedList linkedList = new LinkedList();
            Iterator<MessagingClient<T>> it = this.clients.values().iterator();
            while (it.hasNext()) {
                linkedList.addAll(it.next().close());
            }
            this.clients.clear();
            synchronized (this.inbound) {
                arrayList = new ArrayList(this.inbound);
                this.inbound.clear();
            }
            linkedList.addAll((Collection) arrayList.stream().map((v0) -> {
                return v0.disconnect();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
            LinkedList linkedList2 = new LinkedList();
            Stream map = linkedList.stream().map(networkFuture -> {
                networkFuture.getClass();
                return networkFuture::join;
            });
            linkedList2.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
            MessagingExecutor messagingExecutor = this.async;
            messagingExecutor.getClass();
            linkedList2.add(messagingExecutor::terminate);
            MessagingExecutor messagingExecutor2 = this.async;
            messagingExecutor2.getClass();
            linkedList2.add(messagingExecutor2::awaitTermination);
            this.lock.unlockWrite(writeLock);
            return Waiting.awaitAll(linkedList2);
        } catch (Throwable th) {
            this.lock.unlockWrite(writeLock);
            throw th;
        }
    }

    public MessageReceiver<T> receiver() {
        return this.receiver;
    }

    public Executor executor() {
        return this.async.pooledWorker();
    }

    @Override // io.hekate.core.HekateSupport
    public Hekate hekate() {
        return this.hekate.hekate();
    }

    public Class<T> baseType() {
        return this.baseType;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultMessagingChannel<T> channel() {
        return this.channel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingExecutor async() {
        return this.async;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsCallback metrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceivePressureGuard receiveGuard() {
        return this.receivePressure;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SendPressureGuard sendGuard() {
        return this.sendPressure;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void routeAndSend(MessageContext<T> messageContext, SendCallback sendCallback, FailureInfo failureInfo) {
        MessageRoute<T> messageRoute = null;
        try {
            messageRoute = route(messageContext, failureInfo);
        } catch (HekateException e) {
            notifyOnErrorAsync(messageContext, sendCallback, e);
        } catch (ClientSelectionRejectedException e2) {
            notifyOnErrorAsync(messageContext, sendCallback, e2.getCause());
        } catch (Error | RuntimeException e3) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Got an unexpected runtime error during message routing.", e3);
            }
            notifyOnErrorAsync(messageContext, sendCallback, e3);
        }
        if (messageRoute != null) {
            doSend(messageContext, messageRoute, sendCallback, failureInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSend(MessageContext<T> messageContext, MessageRoute<T> messageRoute, SendCallback sendCallback, FailureInfo failureInfo) {
        messageRoute.client().send(messageRoute, th -> {
            messageRoute.client().touch();
            if (th != null && messageContext.opts().failover() != null) {
                failoverAsync(messageContext, th, messageRoute.client().node(), new FailoverCallback() { // from class: io.hekate.messaging.internal.MessagingGateway.1
                    @Override // io.hekate.messaging.internal.MessagingGateway.FailoverCallback
                    public void retry(FailoverRoutingPolicy failoverRoutingPolicy, FailureInfo failureInfo2) {
                        switch (AnonymousClass3.$SwitchMap$io$hekate$failover$FailoverRoutingPolicy[failoverRoutingPolicy.ordinal()]) {
                            case 1:
                                MessagingGateway.this.doSend(messageContext, messageRoute, sendCallback, failureInfo2);
                                return;
                            case DefaultFailureDetectorConfig.DEFAULT_FAILURE_DETECTION_QUORUM /* 2 */:
                                if (MessagingGateway.this.isKnownNode(messageRoute.client().node())) {
                                    MessagingGateway.this.doSend(messageContext, messageRoute, sendCallback, failureInfo2);
                                    return;
                                } else {
                                    MessagingGateway.this.routeAndSend(messageContext, sendCallback, failureInfo2);
                                    return;
                                }
                            case 3:
                                MessagingGateway.this.routeAndSend(messageContext, sendCallback, failureInfo2);
                                return;
                            default:
                                throw new IllegalArgumentException("Unexpected routing policy: " + failoverRoutingPolicy);
                        }
                    }

                    @Override // io.hekate.messaging.internal.MessagingGateway.FailoverCallback
                    public void fail(Throwable th) {
                        MessagingGateway.this.notifyOnErrorAsync(messageContext, sendCallback, th);
                    }
                }, failureInfo);
            } else if (messageContext.complete()) {
                backPressureRelease();
                if (sendCallback != null) {
                    sendCallback.onComplete(th);
                }
            }
        }, failureInfo != null);
    }

    private void requestAsync(T t, MessageContext<T> messageContext, ResponseCallback<T> responseCallback) {
        try {
            long backPressureAcquire = backPressureAcquire(messageContext.opts().timeout(), t);
            if (backPressureAcquire > 0) {
                doScheduleTimeout(backPressureAcquire, messageContext, responseCallback);
            }
            routeAndRequest(messageContext, responseCallback, null);
        } catch (MessageQueueOverflowException | MessageQueueTimeoutException | InterruptedException e) {
            notifyOnErrorAsync(messageContext, responseCallback, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void routeAndRequest(MessageContext<T> messageContext, ResponseCallback<T> responseCallback, FailureInfo failureInfo) {
        MessageRoute<T> messageRoute = null;
        try {
            messageRoute = route(messageContext, failureInfo);
        } catch (HekateException e) {
            notifyOnErrorAsync(messageContext, responseCallback, e);
        } catch (ClientSelectionRejectedException e2) {
            notifyOnErrorAsync(messageContext, responseCallback, e2.getCause());
        } catch (Error | RuntimeException e3) {
            if (this.log.isErrorEnabled()) {
                this.log.error("Got an unexpected runtime error during message routing.", e3);
            }
            notifyOnErrorAsync(messageContext, responseCallback, e3);
        }
        if (messageRoute != null) {
            doRequest(messageContext, messageRoute, responseCallback, failureInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doRequest(MessageContext<T> messageContext, MessageRoute<T> messageRoute, ResponseCallback<T> responseCallback, FailureInfo failureInfo) {
        InternalRequestCallback<T> internalRequestCallback = (requestHandle, th, response) -> {
            messageRoute.client().touch();
            MessagingEndpoint<T> messagingEndpoint = null;
            T t = null;
            if (th == null) {
                t = response.get();
                messagingEndpoint = response.endpoint();
                th = tryConvertToError(t, messageRoute.receiver());
                if (th != null) {
                    t = null;
                    messagingEndpoint = null;
                }
            }
            ReplyDecision accept = responseCallback.accept(th, t, messagingEndpoint);
            if (accept == null) {
                accept = ReplyDecision.DEFAULT;
            }
            FailoverPolicy failover = messageContext.opts().failover();
            if (accept != ReplyDecision.COMPLETE && ((accept != ReplyDecision.DEFAULT || th != null) && failover != null)) {
                requestHandle.unregister();
                if (accept == ReplyDecision.REJECT) {
                    th = new RejectedReplyException("Response was rejected by a request callback", t, th);
                }
                failoverAsync(messageContext, th, messageRoute.client().node(), new FailoverCallback() { // from class: io.hekate.messaging.internal.MessagingGateway.2
                    @Override // io.hekate.messaging.internal.MessagingGateway.FailoverCallback
                    public void retry(FailoverRoutingPolicy failoverRoutingPolicy, FailureInfo failureInfo2) {
                        switch (AnonymousClass3.$SwitchMap$io$hekate$failover$FailoverRoutingPolicy[failoverRoutingPolicy.ordinal()]) {
                            case 1:
                                MessagingGateway.this.doRequest(messageContext, messageRoute, responseCallback, failureInfo2);
                                return;
                            case DefaultFailureDetectorConfig.DEFAULT_FAILURE_DETECTION_QUORUM /* 2 */:
                                if (MessagingGateway.this.isKnownNode(messageRoute.client().node())) {
                                    MessagingGateway.this.doRequest(messageContext, messageRoute, responseCallback, failureInfo2);
                                    return;
                                } else {
                                    MessagingGateway.this.routeAndRequest(messageContext, responseCallback, failureInfo2);
                                    return;
                                }
                            case 3:
                                MessagingGateway.this.routeAndRequest(messageContext, responseCallback, failureInfo2);
                                return;
                            default:
                                throw new IllegalArgumentException("Unexpected routing policy: " + failoverRoutingPolicy);
                        }
                    }

                    @Override // io.hekate.messaging.internal.MessagingGateway.FailoverCallback
                    public void fail(Throwable th) {
                        MessagingGateway.this.notifyOnErrorAsync(messageContext, responseCallback, th);
                    }
                }, failureInfo);
                return;
            }
            if (th == null && response.isPartial()) {
                if (messageContext.isCompleted()) {
                    return;
                }
                responseCallback.onComplete(null, response);
            } else if (messageContext.complete()) {
                requestHandle.unregister();
                backPressureRelease();
                responseCallback.onComplete(th, response);
            }
        };
        if (messageContext.isStream()) {
            messageRoute.client().stream(messageRoute, internalRequestCallback, failureInfo != null);
        } else {
            messageRoute.client().request(messageRoute, internalRequestCallback, failureInfo != null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean register(MessagingConnectionNetIn<T> messagingConnectionNetIn) {
        long readLock = this.lock.readLock();
        try {
            if (this.closed) {
                return false;
            }
            synchronized (this.inbound) {
                this.inbound.add(messagingConnectionNetIn);
            }
            this.lock.unlockRead(readLock);
            return true;
        } finally {
            this.lock.unlockRead(readLock);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregister(MessagingConnectionNetIn<T> messagingConnectionNetIn) {
        long readLock = this.lock.readLock();
        try {
            synchronized (this.inbound) {
                this.inbound.remove(messagingConnectionNetIn);
            }
        } finally {
            this.lock.unlockRead(readLock);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkIdleConnections() {
        long readLock = this.lock.readLock();
        try {
            if (!this.closed) {
                this.clients.values().forEach((v0) -> {
                    v0.disconnectIfIdle();
                });
            }
        } finally {
            this.lock.unlockRead(readLock);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateTopology() {
        long writeLock = this.lock.writeLock();
        try {
            if (!this.closed) {
                ClusterTopology clusterTopology = this.cluster.topology();
                if (this.clientsTopology == null || this.clientsTopology.version() < clusterTopology.version()) {
                    if (this.debug) {
                        this.log.debug("Updating topology [channel={}, topology={}]", this.name, clusterTopology);
                    }
                    Set<ClusterNode> nodeSet = clusterTopology.nodeSet();
                    Set set = null;
                    Set set2 = null;
                    if (this.clientsTopology == null) {
                        set = new HashSet(nodeSet);
                    } else {
                        for (ClusterNode clusterNode : nodeSet) {
                            if (!this.clientsTopology.contains(clusterNode)) {
                                if (set == null) {
                                    set = new HashSet(nodeSet.size(), 1.0f);
                                }
                                set.add(clusterNode);
                            }
                        }
                        for (ClusterNode clusterNode2 : this.clientsTopology) {
                            if (!nodeSet.contains(clusterNode2)) {
                                if (set2 == null) {
                                    set2 = new HashSet(nodeSet.size(), 1.0f);
                                }
                                set2.add(clusterNode2);
                            }
                        }
                    }
                    if (set2 == null) {
                        set2 = Collections.emptySet();
                    }
                    if (set == null) {
                        set = Collections.emptySet();
                    }
                    r6 = set2.isEmpty() ? null : (List) set2.stream().map(clusterNode3 -> {
                        return this.clients.remove(clusterNode3.id());
                    }).filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).collect(Collectors.toList());
                    if (!set.isEmpty()) {
                        set.forEach(clusterNode4 -> {
                            this.clients.put(clusterNode4.id(), createClient(clusterNode4));
                        });
                    }
                    this.clientsTopology = clusterTopology;
                }
            }
            if (r6 != null) {
                r6.forEach((v0) -> {
                    v0.close();
                });
            }
        } finally {
            this.lock.unlockWrite(writeLock);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleTimeout(MessageContext<T> messageContext, Object obj) {
        doScheduleTimeout(messageContext.opts().timeout(), messageContext, obj);
    }

    MessagingClient<T> clientOf(ClusterNodeId clusterNodeId) throws MessagingException {
        long readLock = this.lock.readLock();
        try {
            MessagingClient<T> messagingClient = this.clients.get(clusterNodeId);
            this.lock.unlockRead(readLock);
            return messagingClient;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    private void doScheduleTimeout(long j, MessageContext<T> messageContext, Object obj) {
        if (messageContext.isCompleted()) {
            return;
        }
        try {
            messageContext.setTimeoutFuture(messageContext.worker().executeDeferred(j, () -> {
                if (messageContext.completeOnTimeout()) {
                    doNotifyOnError(obj, new MessageTimeoutException("Messaging operation timed out [message=" + messageContext.originalMessage() + ']'));
                }
            }));
        } catch (RejectedExecutionException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isKnownNode(ClusterNode clusterNode) {
        long readLock = this.lock.readLock();
        try {
            boolean containsKey = this.clients.containsKey(clusterNode.id());
            this.lock.unlockRead(readLock);
            return containsKey;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    private MessageRoute<T> route(MessageContext<T> messageContext, FailureInfo failureInfo) throws HekateException, ClientSelectionRejectedException {
        ClusterNodeId route;
        if (!$assertionsDisabled && messageContext == null) {
            throw new AssertionError("Message context is null.");
        }
        while (true) {
            ClusterTopology clusterTopology = messageContext.opts().cluster().topology();
            if (clusterTopology.isEmpty()) {
                Throwable error = failureInfo != null ? failureInfo.error() : null;
                if (error == null) {
                    throw new EmptyTopologyException("No suitable receivers [channel=" + this.name + ']');
                }
                throw new ClientSelectionRejectedException(error);
            }
            if (messageContext.opts().balancer() == null) {
                ClusterNode first = clusterTopology.first();
                route = first != null ? first.id() : null;
            } else {
                route = messageContext.opts().balancer().route(messageContext.originalMessage(), new DefaultLoadBalancerContext(messageContext, clusterTopology, this.hekate, messageContext.opts().partitions(), Optional.ofNullable(failureInfo)));
            }
            if (route == null) {
                Throwable error2 = failureInfo != null ? failureInfo.error() : null;
                if (error2 == null) {
                    throw new UnknownRouteException("Load balancer failed to select a target node.");
                }
                throw new ClientSelectionRejectedException(error2);
            }
            long readLock = this.lock.readLock();
            try {
                if (this.closed) {
                    throw channelClosedError(null);
                }
                MessagingClient<T> messagingClient = this.clients.get(route);
                if (messagingClient != null) {
                    MessageRoute<T> messageRoute = new MessageRoute<>(messagingClient, clusterTopology, messageContext, this.interceptor);
                    this.lock.unlockRead(readLock);
                    return messageRoute;
                }
                ClusterTopology clusterTopology2 = this.cluster.topology();
                if (this.clientsTopology != null && this.clientsTopology.version() == clusterTopology2.version()) {
                    Throwable error3 = failureInfo != null ? failureInfo.error() : null;
                    if (error3 == null) {
                        throw new UnknownRouteException("Node is not within the channel topology [id=" + route + ']');
                    }
                    throw new ClientSelectionRejectedException(error3);
                }
                if (this.debug) {
                    this.log.debug("Retrying routing since topology was changed [balancer={}]", messageContext.opts().balancer());
                }
                updateTopology();
            } finally {
                this.lock.unlockRead(readLock);
            }
        }
    }

    private void failoverAsync(MessageContext<T> messageContext, Throwable th, ClusterNode clusterNode, FailoverCallback failoverCallback, FailureInfo failureInfo) {
        onAsyncEnqueue();
        messageContext.worker().execute(() -> {
            onAsyncDequeue();
            failover(messageContext, th, clusterNode, failoverCallback, failureInfo);
        });
    }

    /* JADX WARN: Finally extract failed */
    private void failover(MessageContext<T> messageContext, Throwable th, ClusterNode clusterNode, FailoverCallback failoverCallback, FailureInfo failureInfo) {
        FailoverRoutingPolicy routing;
        if (!$assertionsDisabled && messageContext == null) {
            throw new AssertionError("Message context is null.");
        }
        if (!$assertionsDisabled && th == null) {
            throw new AssertionError("Cause is null.");
        }
        if (!$assertionsDisabled && clusterNode == null) {
            throw new AssertionError("Failed node is null.");
        }
        if (!$assertionsDisabled && failoverCallback == null) {
            throw new AssertionError("Failover callback is null");
        }
        if (messageContext.isCompleted()) {
            return;
        }
        boolean z = false;
        Throwable th2 = th;
        FailoverPolicy failover = messageContext.opts().failover();
        if (failover != null && isRecoverable(th)) {
            DefaultFailoverContext newFailoverContext = newFailoverContext(th, clusterNode, failureInfo);
            try {
                FailureResolution apply = failover.apply(newFailoverContext);
                long readLock = this.lock.readLock();
                try {
                    if (this.closed) {
                        th2 = channelClosedError(th);
                    } else if (apply != null && apply.isRetry() && ((routing = apply.routing()) != FailoverRoutingPolicy.RETRY_SAME_NODE || this.clients.containsKey(clusterNode.id()))) {
                        onRetry();
                        MessagingWorker worker = messageContext.worker();
                        onAsyncEnqueue();
                        worker.executeDeferred(apply.delay(), () -> {
                            onAsyncDequeue();
                            try {
                                failoverCallback.retry(routing, newFailoverContext.withRouting(routing));
                            } catch (Error | RuntimeException e) {
                                this.log.error("Got an unexpected error during failover task processing.", e);
                            }
                        });
                        z = true;
                    }
                    this.lock.unlockRead(readLock);
                } catch (Throwable th3) {
                    this.lock.unlockRead(readLock);
                    throw th3;
                }
            } catch (Error | RuntimeException e) {
                this.log.error("Got an unexpected error while applying failover policy.", e);
            }
        }
        if (z) {
            return;
        }
        failoverCallback.fail(th2);
    }

    private DefaultFailoverContext newFailoverContext(Throwable th, ClusterNode clusterNode, FailureInfo failureInfo) {
        int attempt;
        FailoverRoutingPolicy routing;
        Set unmodifiableSet;
        if (failureInfo == null) {
            attempt = 0;
            routing = FailoverRoutingPolicy.RETRY_SAME_NODE;
            unmodifiableSet = Collections.singleton(clusterNode);
        } else {
            attempt = failureInfo.attempt() + 1;
            routing = failureInfo.routing();
            HashSet hashSet = new HashSet(failureInfo.allFailedNodes());
            hashSet.add(clusterNode);
            unmodifiableSet = Collections.unmodifiableSet(hashSet);
        }
        return new DefaultFailoverContext(attempt, th, clusterNode, unmodifiableSet, routing);
    }

    private boolean isRecoverable(Throwable th) {
        return ((th instanceof MessagingChannelClosedException) || (th instanceof CodecException)) ? false : true;
    }

    private MessagingClient<T> createClient(ClusterNode clusterNode) {
        return this.localNode.equals(clusterNode) ? new MessagingClientMem(this.localNode, this) : new MessagingClientNet(this.name, clusterNode, this.net, this, this.checkIdle);
    }

    private long backPressureAcquire(long j, T t) throws MessageQueueOverflowException, InterruptedException, MessageQueueTimeoutException {
        return this.sendPressure != null ? this.sendPressure.onEnqueue(j, t) : j;
    }

    private void backPressureRelease() {
        if (this.sendPressure != null) {
            this.sendPressure.onDequeue();
        }
    }

    private void onAsyncDequeue() {
        if (this.metrics != null) {
            this.metrics.onAsyncEnqueue();
        }
    }

    private void onAsyncEnqueue() {
        if (this.metrics != null) {
            this.metrics.onAsyncDequeue();
        }
    }

    private void onRetry() {
        if (this.metrics != null) {
            this.metrics.onRetry();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOnErrorAsync(MessageContext<T> messageContext, Object obj, Throwable th) {
        onAsyncEnqueue();
        messageContext.worker().execute(() -> {
            onAsyncDequeue();
            if (messageContext.complete()) {
                doNotifyOnError(obj, th);
            }
        });
    }

    private void doNotifyOnError(Object obj, Throwable th) {
        backPressureRelease();
        if (obj != null) {
            if (obj instanceof SendCallback) {
                try {
                    ((SendCallback) obj).onComplete(th);
                    return;
                } catch (Error | RuntimeException e) {
                    this.log.error("Got an unexpected runtime error while notifying send callback on another error [cause={}]", th, e);
                    return;
                }
            }
            if (!(obj instanceof ResponseCallback)) {
                throw new IllegalArgumentException("Unexpected callback type: " + obj);
            }
            try {
                ((ResponseCallback) obj).onComplete(th, null);
            } catch (Error | RuntimeException e2) {
                this.log.error("Got an unexpected runtime error while notifying request callback on another error [cause={}]", th, e2);
            }
        }
    }

    private MessageContext<T> newContext(Object obj, T t, MessagingOpts<T> messagingOpts) {
        return newContext(obj, t, messagingOpts, false);
    }

    private MessageContext<T> newContext(Object obj, T t, MessagingOpts<T> messagingOpts, boolean z) {
        int affinity = affinity(obj);
        return new MessageContext<>(t, affinity, obj, (obj != null || z) ? this.async.workerFor(affinity) : this.async.pooledWorker(), messagingOpts, z);
    }

    private MessagingChannelClosedException channelClosedError(Throwable th) {
        return new MessagingChannelClosedException("Channel closed [channel=" + this.name + ']', th);
    }

    private Throwable tryConvertToError(T t, ClusterNode clusterNode) {
        Throwable th = null;
        if (t instanceof FailureResponse) {
            th = ((FailureResponse) t).asError(clusterNode);
            if (th == null) {
                th = new IllegalArgumentException(FailureResponse.class.getSimpleName() + " message returned null error [message=" + t + ']');
            }
        }
        return th;
    }

    private void checkMessageType(T t) {
        if (!$assertionsDisabled && t == null) {
            throw new AssertionError("Message must be not null.");
        }
        if (!this.baseType.isInstance(t)) {
            throw new ClassCastException("Messaging channel doesn't support the specified type [channel-type=" + this.baseType.getName() + ", message-type=" + t.getClass().getName() + ']');
        }
    }

    private static int affinity(Object obj) {
        return obj == null ? ThreadLocalRandom.current().nextInt() : obj.hashCode();
    }

    public String toString() {
        return ToString.format(this);
    }

    static {
        $assertionsDisabled = !MessagingGateway.class.desiredAssertionStatus();
    }
}
