package io.hekate.coordinate.internal;

import io.hekate.cluster.ClusterNodeFilter;
import io.hekate.cluster.ClusterService;
import io.hekate.cluster.ClusterView;
import io.hekate.cluster.event.ClusterEvent;
import io.hekate.cluster.event.ClusterEventType;
import io.hekate.codec.CodecService;
import io.hekate.coordinate.CoordinationConfigProvider;
import io.hekate.coordinate.CoordinationFuture;
import io.hekate.coordinate.CoordinationProcess;
import io.hekate.coordinate.CoordinationProcessConfig;
import io.hekate.coordinate.CoordinationService;
import io.hekate.coordinate.CoordinationServiceFactory;
import io.hekate.coordinate.internal.CoordinationProtocol;
import io.hekate.core.Hekate;
import io.hekate.core.HekateException;
import io.hekate.core.internal.util.ArgAssert;
import io.hekate.core.internal.util.ConfigCheck;
import io.hekate.core.internal.util.HekateThreadFactory;
import io.hekate.core.internal.util.StreamUtils;
import io.hekate.core.internal.util.Utils;
import io.hekate.core.service.ConfigurableService;
import io.hekate.core.service.ConfigurationContext;
import io.hekate.core.service.DependencyContext;
import io.hekate.core.service.DependentService;
import io.hekate.core.service.InitializationContext;
import io.hekate.core.service.InitializingService;
import io.hekate.core.service.TerminatingService;
import io.hekate.messaging.Message;
import io.hekate.messaging.MessagingChannel;
import io.hekate.messaging.MessagingChannelConfig;
import io.hekate.messaging.MessagingConfigProvider;
import io.hekate.messaging.MessagingService;
import io.hekate.util.StateGuard;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.async.Waiting;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/coordinate/internal/DefaultCoordinationService.class */
public class DefaultCoordinationService implements CoordinationService, ConfigurableService, DependentService, InitializingService, TerminatingService, MessagingConfigProvider {
    private static final Logger log;
    private static final boolean DEBUG;
    private static final String CHANNEL_NAME = "hekate.coordination";
    private static final ClusterNodeFilter HAS_SERVICE_FILTER;
    private final long retryDelay;
    private final int nioThreads;
    private final long idleSocketTimeout;

    @ToStringIgnore
    private final StateGuard guard = new StateGuard(CoordinationService.class);

    @ToStringIgnore
    private final List<CoordinationProcessConfig> processesConfig = new ArrayList();

    @ToStringIgnore
    private final Map<String, DefaultCoordinationProcess> processes = new HashMap();

    @ToStringIgnore
    private Hekate hekate;

    @ToStringIgnore
    private MessagingService messaging;

    @ToStringIgnore
    private ClusterView cluster;

    @ToStringIgnore
    private CodecService defaultCodec;
    static final /* synthetic */ boolean $assertionsDisabled;

    public DefaultCoordinationService(CoordinationServiceFactory coordinationServiceFactory) {
        if (!$assertionsDisabled && coordinationServiceFactory == null) {
            throw new AssertionError("Factory is null.");
        }
        ConfigCheck.get(CoordinationServiceFactory.class).positive(coordinationServiceFactory.getRetryInterval(), "retry interval");
        this.nioThreads = coordinationServiceFactory.getNioThreads();
        this.retryDelay = coordinationServiceFactory.getRetryInterval();
        this.idleSocketTimeout = coordinationServiceFactory.getIdleSocketTimeout();
        Stream nullSafe = StreamUtils.nullSafe(coordinationServiceFactory.getProcesses());
        List<CoordinationProcessConfig> list = this.processesConfig;
        list.getClass();
        nullSafe.forEach((v1) -> {
            r1.add(v1);
        });
    }

    @Override // io.hekate.core.service.DependentService
    public void resolve(DependencyContext dependencyContext) {
        this.hekate = dependencyContext.hekate();
        this.messaging = (MessagingService) dependencyContext.require(MessagingService.class);
        this.cluster = ((ClusterService) dependencyContext.require(ClusterService.class)).filter(HAS_SERVICE_FILTER);
        this.defaultCodec = (CodecService) dependencyContext.require(CodecService.class);
    }

    @Override // io.hekate.core.service.ConfigurableService
    public void configure(ConfigurationContext configurationContext) {
        StreamUtils.nullSafe(configurationContext.findComponents(CoordinationConfigProvider.class)).forEach(coordinationConfigProvider -> {
            Stream nullSafe = StreamUtils.nullSafe(coordinationConfigProvider.configureCoordination());
            List<CoordinationProcessConfig> list = this.processesConfig;
            list.getClass();
            nullSafe.forEach((v1) -> {
                r1.add(v1);
            });
        });
        ConfigCheck configCheck = ConfigCheck.get(CoordinationProcessConfig.class);
        HashSet hashSet = new HashSet();
        this.processesConfig.forEach(coordinationProcessConfig -> {
            configCheck.notEmpty(coordinationProcessConfig.getName(), "name");
            configCheck.validSysName(coordinationProcessConfig.getName(), "name");
            configCheck.notNull(coordinationProcessConfig.getHandler(), "handler");
            String trim = coordinationProcessConfig.getName().trim();
            configCheck.unique(trim, hashSet, "name");
            hashSet.add(trim);
        });
        this.processesConfig.forEach(coordinationProcessConfig2 -> {
            configurationContext.setBoolProperty(propertyName(coordinationProcessConfig2.getName().trim()), true);
        });
    }

    @Override // io.hekate.messaging.MessagingConfigProvider
    public Collection<MessagingChannelConfig<?>> configureMessaging() {
        if (this.processesConfig.isEmpty()) {
            return Collections.emptyList();
        }
        HashMap hashMap = new HashMap();
        this.processesConfig.forEach(coordinationProcessConfig -> {
            String trim = coordinationProcessConfig.getName().trim();
            if (coordinationProcessConfig.getMessageCodec() == null) {
                hashMap.put(trim, this.defaultCodec.codecFactory());
            } else {
                hashMap.put(trim, coordinationProcessConfig.getMessageCodec());
            }
        });
        return Collections.singleton(MessagingChannelConfig.of(CoordinationProtocol.class).withName(CHANNEL_NAME).withClusterFilter(HAS_SERVICE_FILTER).withNioThreads(this.nioThreads).withIdleSocketTimeout(this.idleSocketTimeout).withRetryPolicy(retryPolicy -> {
            retryPolicy.withFixedDelay(this.retryDelay);
        }).withLogCategory(CoordinationProtocol.class.getName()).withMessageCodec(() -> {
            return new CoordinationProtocolCodec(hashMap);
        }).withReceiver(this::handleMessage));
    }

    @Override // io.hekate.core.service.InitializingService
    public void initialize(InitializationContext initializationContext) throws HekateException {
        this.guard.lockWrite();
        try {
            this.guard.becomeInitialized();
            if (DEBUG) {
                log.debug("Initializing...");
            }
            if (!this.processesConfig.isEmpty()) {
                this.cluster.addListener(this::processTopologyChange, ClusterEventType.JOIN, ClusterEventType.CHANGE);
                MessagingChannel<CoordinationProtocol> channel = this.messaging.channel(CHANNEL_NAME, CoordinationProtocol.class);
                Iterator<CoordinationProcessConfig> it = this.processesConfig.iterator();
                while (it.hasNext()) {
                    initializeProcess(it.next(), initializationContext, channel);
                }
            }
            if (DEBUG) {
                log.debug("Initialized.");
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    @Override // io.hekate.core.service.TerminatingService
    public void preTerminate() throws HekateException {
        ((Waiting) this.guard.withWriteLock(() -> {
            if (!this.guard.becomeTerminating()) {
                return Waiting.NO_WAIT;
            }
            if (DEBUG) {
                log.debug("Pre-terminating.");
            }
            return Waiting.awaitAll((Collection) this.processes.values().stream().map((v0) -> {
                return v0.terminate();
            }).collect(Collectors.toList()));
        })).awaitUninterruptedly();
    }

    @Override // io.hekate.core.service.TerminatingService
    public void terminate() throws HekateException {
        Waiting waiting = null;
        this.guard.lockWrite();
        try {
            if (this.guard.becomeTerminated()) {
                if (DEBUG) {
                    log.debug("Terminating.");
                }
                waiting = Waiting.awaitAll((Collection) this.processes.values().stream().map((v0) -> {
                    return v0.terminate();
                }).collect(Collectors.toList()));
                this.processes.clear();
            }
            if (waiting != null) {
                waiting.awaitUninterruptedly();
                if (DEBUG) {
                    log.debug("Terminated.");
                }
            }
        } finally {
            this.guard.unlockWrite();
        }
    }

    @Override // io.hekate.coordinate.CoordinationService
    public List<CoordinationProcess> allProcesses() {
        this.guard.lockReadWithStateCheck();
        try {
            return new ArrayList(this.processes.values());
        } finally {
            this.guard.unlockRead();
        }
    }

    @Override // io.hekate.coordinate.CoordinationService
    public CoordinationProcess process(String str) {
        this.guard.lockReadWithStateCheck();
        try {
            DefaultCoordinationProcess defaultCoordinationProcess = this.processes.get(str);
            ArgAssert.check(defaultCoordinationProcess != null, "Coordination process not configured [name=" + str + ']');
            return defaultCoordinationProcess;
        } finally {
            this.guard.unlockRead();
        }
    }

    @Override // io.hekate.coordinate.CoordinationService
    public boolean hasProcess(String str) {
        this.guard.lockReadWithStateCheck();
        try {
            return this.processes.containsKey(str);
        } finally {
            this.guard.unlockRead();
        }
    }

    @Override // io.hekate.coordinate.CoordinationService
    public CoordinationFuture futureOf(String str) {
        return process(str).future();
    }

    private void initializeProcess(CoordinationProcessConfig coordinationProcessConfig, InitializationContext initializationContext, MessagingChannel<CoordinationProtocol> messagingChannel) throws HekateException {
        if (!$assertionsDisabled && !this.guard.isWriteLocked()) {
            throw new AssertionError("Thread must hold write lock.");
        }
        if (DEBUG) {
            log.debug("Registering new process [configuration={}]", coordinationProcessConfig);
        }
        String trim = coordinationProcessConfig.getName().trim();
        DefaultCoordinationProcess defaultCoordinationProcess = new DefaultCoordinationProcess(trim, this.hekate, coordinationProcessConfig.getHandler(), Executors.newSingleThreadExecutor(new HekateThreadFactory("Coordination-" + trim)), messagingChannel);
        this.processes.put(trim, defaultCoordinationProcess);
        if (!coordinationProcessConfig.isAsyncInit()) {
            initializationContext.cluster().addSyncFuture(defaultCoordinationProcess.future());
        }
        try {
            AsyncUtils.getUninterruptedly(defaultCoordinationProcess.initialize());
        } catch (ExecutionException e) {
            throw new HekateException("Failed to initialize coordination handler [process=" + trim + ']', e.getCause());
        }
    }

    private void handleMessage(Message<CoordinationProtocol> message) {
        DefaultCoordinationProcess defaultCoordinationProcess = null;
        CoordinationProtocol.RequestBase requestBase = (CoordinationProtocol.RequestBase) message.payload(CoordinationProtocol.RequestBase.class);
        this.guard.lockRead();
        try {
            if (this.guard.isInitialized()) {
                defaultCoordinationProcess = this.processes.get(requestBase.processName());
                if (defaultCoordinationProcess == null) {
                    throw new IllegalStateException("Received coordination request for unknown process: " + requestBase);
                }
            }
            if (defaultCoordinationProcess != null) {
                defaultCoordinationProcess.processMessage(message);
                return;
            }
            if (DEBUG) {
                log.debug("Rejecting coordination message since service is not initialized [message={}]", requestBase);
            }
            message.reply(CoordinationProtocol.Reject.INSTANCE);
        } finally {
            this.guard.unlockRead();
        }
    }

    private void processTopologyChange(ClusterEvent clusterEvent) {
        this.guard.lockRead();
        try {
            if (this.guard.isInitialized()) {
                this.processes.values().forEach(defaultCoordinationProcess -> {
                    defaultCoordinationProcess.processTopologyChange(clusterEvent.topology().filter(clusterNode -> {
                        return clusterNode.service(CoordinationService.class).property(propertyName(defaultCoordinationProcess.name())) != null;
                    }));
                });
            }
        } finally {
            this.guard.unlockRead();
        }
    }

    private static String propertyName(String str) {
        return "process." + str;
    }

    public String toString() {
        return CoordinationService.class.getSimpleName() + '[' + ToString.formatProperties(this) + ", processes=" + Utils.toString(this.processesConfig, (v0) -> {
            return v0.getName();
        }) + ']';
    }

    static {
        $assertionsDisabled = !DefaultCoordinationService.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(DefaultCoordinationProcess.class);
        DEBUG = log.isDebugEnabled();
        HAS_SERVICE_FILTER = clusterNode -> {
            return clusterNode.hasService(CoordinationService.class);
        };
    }
}
