package io.hekate.messaging.internal;

import io.hekate.cluster.ClusterAcceptor;
import io.hekate.cluster.ClusterNode;
import io.hekate.cluster.ClusterNodeId;
import io.hekate.cluster.ClusterService;
import io.hekate.cluster.event.ClusterEvent;
import io.hekate.cluster.event.ClusterEventType;
import io.hekate.codec.CodecFactory;
import io.hekate.codec.CodecService;
import io.hekate.core.Hekate;
import io.hekate.core.HekateException;
import io.hekate.core.ServiceInfo;
import io.hekate.core.internal.util.ArgAssert;
import io.hekate.core.internal.util.ConfigCheck;
import io.hekate.core.internal.util.HekateThreadFactory;
import io.hekate.core.internal.util.StreamUtils;
import io.hekate.core.jmx.JmxService;
import io.hekate.core.service.ConfigurableService;
import io.hekate.core.service.ConfigurationContext;
import io.hekate.core.service.DependencyContext;
import io.hekate.core.service.DependentService;
import io.hekate.core.service.InitializationContext;
import io.hekate.core.service.InitializingService;
import io.hekate.core.service.TerminatingService;
import io.hekate.messaging.MessageReceiver;
import io.hekate.messaging.MessagingBackPressureConfig;
import io.hekate.messaging.MessagingChannel;
import io.hekate.messaging.MessagingChannelConfig;
import io.hekate.messaging.MessagingConfigProvider;
import io.hekate.messaging.MessagingOverflowPolicy;
import io.hekate.messaging.MessagingService;
import io.hekate.messaging.MessagingServiceFactory;
import io.hekate.messaging.intercept.MessageInterceptor;
import io.hekate.messaging.internal.MessagingProtocol;
import io.hekate.network.NetworkConfigProvider;
import io.hekate.network.NetworkConnector;
import io.hekate.network.NetworkConnectorConfig;
import io.hekate.network.NetworkEndpoint;
import io.hekate.network.NetworkMessage;
import io.hekate.network.NetworkServerHandler;
import io.hekate.network.NetworkService;
import io.hekate.util.StateGuard;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.async.ExtendedScheduledExecutor;
import io.hekate.util.async.Waiting;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/messaging/internal/DefaultMessagingService.class */
public class DefaultMessagingService implements MessagingService, DependentService, ConfigurableService, InitializingService, TerminatingService, NetworkConfigProvider, ClusterAcceptor {
    private static final Logger log;
    private static final boolean DEBUG;
    private static final String MESSAGING_THREAD_PREFIX = "Messaging";
    private final MessagingServiceFactory factory;
    private ExtendedScheduledExecutor timer;
    private CodecService codec;
    private NetworkService net;
    private ClusterService cluster;
    private JmxService jmx;
    private volatile ClusterNodeId nodeId;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final StateGuard guard = new StateGuard(MessagingService.class);
    private final Map<String, MessagingGateway<?>> gateways = new HashMap();

    public DefaultMessagingService(MessagingServiceFactory messagingServiceFactory) {
        if (!$assertionsDisabled && messagingServiceFactory == null) {
            throw new AssertionError("Factory is null.");
        }
        this.factory = messagingServiceFactory;
    }

    @Override // io.hekate.core.service.DependentService
    public void resolve(DependencyContext dependencyContext) {
        this.net = (NetworkService) dependencyContext.require(NetworkService.class);
        this.cluster = (ClusterService) dependencyContext.require(ClusterService.class);
        this.codec = (CodecService) dependencyContext.require(CodecService.class);
        this.jmx = (JmxService) dependencyContext.optional(JmxService.class);
    }

    @Override // io.hekate.core.service.ConfigurableService
    public void configure(ConfigurationContext configurationContext) {
        List list = (List) StreamUtils.nullSafe(this.factory.getGlobalInterceptors()).collect(Collectors.toList());
        StreamUtils.nullSafe(this.factory.getChannels()).forEach(messagingChannelConfig -> {
            register(messagingChannelConfig, list);
        });
        StreamUtils.nullSafe(this.factory.getConfigProviders()).forEach(messagingConfigProvider -> {
            StreamUtils.nullSafe(messagingConfigProvider.configureMessaging()).forEach(messagingChannelConfig2 -> {
                register(messagingChannelConfig2, list);
            });
        });
        StreamUtils.nullSafe(configurationContext.findComponents(MessagingConfigProvider.class)).forEach(messagingConfigProvider2 -> {
            StreamUtils.nullSafe(messagingConfigProvider2.configureMessaging()).forEach(messagingChannelConfig2 -> {
                register(messagingChannelConfig2, list);
            });
        });
        this.gateways.values().forEach(messagingGateway -> {
            configurationContext.setStringProperty(MessagingMetaData.propertyName(messagingGateway.name()), new MessagingMetaData(messagingGateway.hasReceiver(), messagingGateway.baseType().getName()).toString());
        });
    }

    @Override // io.hekate.cluster.ClusterAcceptor
    public String acceptJoin(ClusterNode clusterNode, Hekate hekate) {
        if (!clusterNode.hasService(MessagingService.class)) {
            return null;
        }
        ServiceInfo service = hekate.localNode().service(MessagingService.class);
        ServiceInfo service2 = clusterNode.service(MessagingService.class);
        Iterator<MessagingGateway<?>> it = this.gateways.values().iterator();
        while (it.hasNext()) {
            String name = it.next().name();
            MessagingMetaData parse = MessagingMetaData.parse(service.stringProperty(MessagingMetaData.propertyName(name)));
            MessagingMetaData parse2 = MessagingMetaData.parse(service2.stringProperty(MessagingMetaData.propertyName(name)));
            if (parse2 != null && !parse.type().equals(parse2.type())) {
                return "Invalid " + MessagingChannelConfig.class.getSimpleName() + " - 'baseType' value mismatch between the joining node and the cluster [channel=" + name + ", joining-type=" + parse2.type() + ", cluster-type=" + parse.type() + ", rejected-by=" + hekate.localNode().address() + ']';
            }
        }
        return null;
    }

    @Override // io.hekate.network.NetworkConfigProvider
    public Collection<NetworkConnectorConfig<?>> configureNetwork() {
        if (this.gateways.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(this.gateways.size());
        this.gateways.values().forEach(messagingGateway -> {
            arrayList.add(networkConfigFor(messagingGateway));
        });
        return arrayList;
    }

    @Override // io.hekate.core.service.InitializingService
    public void initialize(InitializationContext initializationContext) throws HekateException {
        this.guard.lockWrite();
        try {
            this.guard.becomeInitialized();
            if (DEBUG) {
                log.debug("Initializing...");
            }
            if (!this.gateways.isEmpty()) {
                this.nodeId = initializationContext.localNode().id();
                this.timer = newTimer();
                Iterator<MessagingGateway<?>> it = this.gateways.values().iterator();
                while (it.hasNext()) {
                    initializeGateway(it.next(), initializationContext.metrics());
                }
                this.cluster.addListener(this::updateTopology, ClusterEventType.JOIN, ClusterEventType.CHANGE);
            }
            if (DEBUG) {
                log.debug("Initialized.");
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    @Override // io.hekate.core.service.TerminatingService
    public void preTerminate() throws HekateException {
        this.guard.lockWrite();
        try {
            this.guard.becomeTerminating();
        } finally {
            this.guard.unlockWrite();
        }
    }

    @Override // io.hekate.core.service.TerminatingService
    public void terminate() {
        List list = null;
        this.guard.lockWrite();
        try {
            if (this.guard.becomeTerminated()) {
                if (DEBUG) {
                    log.debug("Terminating...");
                }
                list = (List) this.gateways.values().stream().map((v0) -> {
                    return v0.context();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map((v0) -> {
                    return v0.close();
                }).collect(Collectors.toList());
                if (this.timer != null) {
                    list.add(AsyncUtils.shutdown(this.timer));
                    this.timer = null;
                }
                this.nodeId = null;
            }
            if (list != null) {
                Waiting.awaitAll(list).awaitUninterruptedly();
                if (DEBUG) {
                    log.debug("Terminated.");
                }
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    @Override // io.hekate.messaging.MessagingService
    public List<MessagingChannel<?>> allChannels() {
        this.guard.lockReadWithStateCheck();
        try {
            ArrayList arrayList = new ArrayList(this.gateways.size());
            this.gateways.values().forEach(messagingGateway -> {
                arrayList.add(messagingGateway.context().channel());
            });
            return arrayList;
        } finally {
            this.guard.unlockRead();
        }
    }

    @Override // io.hekate.messaging.MessagingService
    public DefaultMessagingChannel<Object> channel(String str) throws IllegalArgumentException {
        return channel(str, (Class) null);
    }

    @Override // io.hekate.messaging.MessagingService
    public <T> DefaultMessagingChannel<T> channel(String str, Class<T> cls) throws IllegalArgumentException {
        ArgAssert.notNull(str, "Channel name");
        this.guard.lockReadWithStateCheck();
        try {
            MessagingGateway<?> messagingGateway = this.gateways.get(str);
            ArgAssert.check(messagingGateway != null, "No such channel [name=" + str + ']');
            if (cls != null && !messagingGateway.baseType().isAssignableFrom(cls)) {
                throw new ClassCastException("Messaging channel doesn't support the specified type [channel-type=" + messagingGateway.baseType().getName() + ", requested-type=" + cls.getName() + ']');
            }
            DefaultMessagingChannel<T> defaultMessagingChannel = (DefaultMessagingChannel<T>) messagingGateway.context().channel();
            this.guard.unlockRead();
            return defaultMessagingChannel;
        } catch (Throwable th) {
            this.guard.unlockRead();
            throw th;
        }
    }

    @Override // io.hekate.messaging.MessagingService
    public boolean hasChannel(String str) {
        return this.gateways.containsKey(str);
    }

    private <T> void register(MessagingChannelConfig<T> messagingChannelConfig, List<MessageInterceptor> list) {
        ConfigCheck configCheck = ConfigCheck.get(MessagingChannelConfig.class);
        configCheck.notEmpty(messagingChannelConfig.getName(), "name");
        configCheck.validSysName(messagingChannelConfig.getName(), "name");
        configCheck.notNull(messagingChannelConfig.getBaseType(), "base type");
        configCheck.positive(messagingChannelConfig.getPartitions(), "partitions");
        configCheck.isPowerOfTwo(messagingChannelConfig.getPartitions(), "partitions size");
        MessagingBackPressureConfig backPressure = messagingChannelConfig.getBackPressure();
        if (backPressure != null) {
            int outHighWatermark = backPressure.getOutHighWatermark();
            int outLowWatermark = backPressure.getOutLowWatermark();
            int inHighWatermark = backPressure.getInHighWatermark();
            int inLowWatermark = backPressure.getInLowWatermark();
            MessagingOverflowPolicy outOverflowPolicy = backPressure.getOutOverflowPolicy();
            configCheck.notNull(outOverflowPolicy, "outbound queue overflow policy");
            if (outOverflowPolicy != MessagingOverflowPolicy.IGNORE) {
                configCheck.positive(outHighWatermark, "outbound queue high watermark");
                configCheck.that(outHighWatermark > outLowWatermark, "outbound queue high watermark must be greater than low watermark.");
            }
            if (inHighWatermark > 0) {
                configCheck.that(inHighWatermark > inLowWatermark, "inbound queue high watermark must be greater than low watermark.");
            }
        }
        MessagingGateway<?> messagingGateway = new MessagingGateway<>(messagingChannelConfig, this.cluster, this.codec, list);
        configCheck.unique(messagingGateway.name(), this.gateways.keySet(), "name");
        Class<T> baseType = messagingGateway.codecFactory().createCodec().baseType();
        configCheck.isTrue(baseType.isAssignableFrom(messagingChannelConfig.getBaseType()), "channel type must be a sub-class of message codec type [channel-type=" + messagingChannelConfig.getBaseType().getName() + ", codec-type=" + baseType.getName() + ']');
        this.gateways.put(messagingGateway.name(), messagingGateway);
    }

    private <T> void initializeGateway(MessagingGateway<T> messagingGateway, MeterRegistry meterRegistry) throws HekateException {
        if (!$assertionsDisabled && messagingGateway == null) {
            throw new AssertionError("Channel gateway is null.");
        }
        if (!$assertionsDisabled && !this.guard.isWriteLocked()) {
            throw new AssertionError("Thread must hold a write lock.");
        }
        if (DEBUG) {
            log.debug("Creating a new messaging gateway [context={}]", messagingGateway);
        }
        NetworkConnector<T> connector = this.net.connector(messagingGateway.name());
        MessagingExecutor messagingExecutorAsync = messagingGateway.workerThreads() > 0 ? new MessagingExecutorAsync(messagingGateway.workerThreads(), newThreadFactory(messagingGateway.name())) : new MessagingExecutorSync(newThreadFactory(messagingGateway.name()));
        String name = messagingGateway.name();
        MessagingExecutor messagingExecutor = messagingExecutorAsync;
        messagingExecutor.getClass();
        IntSupplier intSupplier = messagingExecutor::activeTasks;
        MessagingExecutor messagingExecutor2 = messagingExecutorAsync;
        messagingExecutor2.getClass();
        MessagingMetrics messagingMetrics = new MessagingMetrics(name, intSupplier, messagingExecutor2::completedTasks, meterRegistry);
        MessagingGatewayContext<T> messagingGatewayContext = new MessagingGatewayContext<>(messagingGateway.name(), messagingGateway.baseType(), connector, this.cluster.localNode(), applyGuard(messagingGateway.unguardedReceiver()), messagingExecutorAsync, this.timer, messagingMetrics, messagingGateway.receivePressureGuard(), messagingGateway.sendPressureGuard(), messagingGateway.interceptors(), messagingGateway.log(), messagingGateway.idleSocketTimeout() > 0, messagingGateway.messagingTimeout(), messagingGateway.warnOnRetry(), messagingGateway.baseRetryPolicy(), messagingGateway.rootChannel());
        long idleSocketTimeout = messagingGateway.idleSocketTimeout();
        if (idleSocketTimeout > 0) {
            if (DEBUG) {
                log.debug("Scheduling new task for idle channel handling [check-interval={}]", Long.valueOf(idleSocketTimeout));
            }
            this.timer.repeatWithFixedDelay(() -> {
                try {
                    messagingGatewayContext.checkIdleConnections();
                } catch (Error | RuntimeException e) {
                    log.error("Got an unexpected error while checking for idle connections [channel={}]", messagingGateway.name(), e);
                }
                return !messagingGatewayContext.isClosed();
            }, idleSocketTimeout, idleSocketTimeout, TimeUnit.MILLISECONDS);
        }
        messagingGateway.init(messagingGatewayContext);
        if (this.jmx != null) {
            this.jmx.register(new DefaultMessagingChannelJmx(messagingGateway), messagingGatewayContext.name());
        }
    }

    private <T> MessageReceiver<T> applyGuard(MessageReceiver<T> messageReceiver) {
        if (messageReceiver == null) {
            return null;
        }
        return new GuardedMessageReceiver(this.guard, messageReceiver);
    }

    private <T> NetworkConnectorConfig<MessagingProtocol> networkConfigFor(MessagingGateway<T> messagingGateway) {
        if (!$assertionsDisabled && messagingGateway == null) {
            throw new AssertionError("Channel gateway is null.");
        }
        NetworkConnectorConfig<MessagingProtocol> networkConnectorConfig = new NetworkConnectorConfig<>();
        networkConnectorConfig.setProtocol(messagingGateway.name());
        networkConnectorConfig.setLogCategory(messagingGateway.logCategory());
        CodecFactory<T> codecFactory = messagingGateway.codecFactory();
        networkConnectorConfig.setMessageCodec(() -> {
            return new MessagingProtocolCodec(codecFactory.createCodec());
        });
        if (messagingGateway.nioThreads() > 0) {
            networkConnectorConfig.setNioThreads(messagingGateway.nioThreads());
        }
        if (messagingGateway.hasReceiver()) {
            networkConnectorConfig.setServerHandler(new NetworkServerHandler<MessagingProtocol>() { // from class: io.hekate.messaging.internal.DefaultMessagingService.1
                @Override // io.hekate.network.NetworkServerHandler
                public void onConnect(MessagingProtocol messagingProtocol, NetworkEndpoint<MessagingProtocol> networkEndpoint) {
                    MessagingProtocol.Connect connect = (MessagingProtocol.Connect) messagingProtocol;
                    if (!connect.to().equals(DefaultMessagingService.this.nodeId)) {
                        networkEndpoint.disconnect();
                        return;
                    }
                    MessagingGateway messagingGateway2 = (MessagingGateway) DefaultMessagingService.this.gateways.get(networkEndpoint.protocol());
                    if (messagingGateway2 == null) {
                        networkEndpoint.disconnect();
                        return;
                    }
                    MessagingGatewayContext context = messagingGateway2.context();
                    if (context == null) {
                        networkEndpoint.disconnect();
                        return;
                    }
                    MessagingConnectionIn messagingConnectionIn = new MessagingConnectionIn(networkEndpoint, new DefaultMessagingEndpoint(connect.from(), context.channel()), context);
                    if (!context.register(messagingConnectionIn)) {
                        networkEndpoint.disconnect();
                    } else {
                        networkEndpoint.setContext(messagingConnectionIn);
                        messagingConnectionIn.onConnect();
                    }
                }

                @Override // io.hekate.network.NetworkServerHandler
                public void onMessage(NetworkMessage<MessagingProtocol> networkMessage, NetworkEndpoint<MessagingProtocol> networkEndpoint) throws IOException {
                    MessagingConnectionIn messagingConnectionIn = (MessagingConnectionIn) networkEndpoint.getContext();
                    if (messagingConnectionIn != null) {
                        messagingConnectionIn.receive(networkMessage, networkEndpoint);
                    }
                }

                @Override // io.hekate.network.NetworkServerHandler
                public void onDisconnect(NetworkEndpoint<MessagingProtocol> networkEndpoint) {
                    MessagingConnectionIn messagingConnectionIn = (MessagingConnectionIn) networkEndpoint.getContext();
                    if (messagingConnectionIn != null) {
                        messagingConnectionIn.onDisconnect();
                    }
                }
            });
        }
        return networkConnectorConfig;
    }

    private void updateTopology(ClusterEvent clusterEvent) {
        if (!$assertionsDisabled && clusterEvent == null) {
            throw new AssertionError("Topology is null.");
        }
        this.guard.lockRead();
        try {
            if (this.guard.isInitialized()) {
                this.gateways.values().forEach(messagingGateway -> {
                    messagingGateway.context().checkTopologyChanges();
                });
            }
        } finally {
            this.guard.unlockRead();
        }
    }

    private static ExtendedScheduledExecutor newTimer() {
        ExtendedScheduledExecutor extendedScheduledExecutor = new ExtendedScheduledExecutor(1, newThreadFactory("Timer"));
        extendedScheduledExecutor.setRemoveOnCancelPolicy(true);
        return extendedScheduledExecutor;
    }

    private static HekateThreadFactory newThreadFactory(String str) {
        return new HekateThreadFactory("Messaging-" + str);
    }

    public String toString() {
        return MessagingService.class.getSimpleName() + "[client-channels=" + ((String) this.gateways.values().stream().filter(messagingGateway -> {
            return !messagingGateway.hasReceiver();
        }).map((v0) -> {
            return v0.name();
        }).collect(Collectors.joining(", ", "{", "}"))) + ", server-channels=" + ((String) this.gateways.values().stream().filter((v0) -> {
            return v0.hasReceiver();
        }).map((v0) -> {
            return v0.name();
        }).collect(Collectors.joining(", ", "{", "}"))) + ']';
    }

    static {
        $assertionsDisabled = !DefaultMessagingService.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DefaultMessagingService.class);
        DEBUG = log.isDebugEnabled();
    }
}
