package org.nanoframework.concurrent.scheduler.cluster.loader;

import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import com.orbitz.consul.ConsulException;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.cache.KVCache;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.nanoframework.commons.loader.PropertiesLoader;
import org.nanoframework.commons.support.logging.Logger;
import org.nanoframework.commons.support.logging.LoggerFactory;
import org.nanoframework.commons.util.CollectionUtils;
import org.nanoframework.commons.util.ReflectUtils;
import org.nanoframework.concurrent.scheduler.cluster.BaseClusterScheduler;
import org.nanoframework.concurrent.scheduler.cluster.ClusterScheduler;
import org.nanoframework.concurrent.scheduler.cluster.config.Configure;
import org.nanoframework.concurrent.scheduler.cluster.config.Node;
import org.nanoframework.concurrent.scheduler.cluster.config.NodeStatus;
import org.nanoframework.concurrent.scheduler.cluster.consts.ConsulSources;
import org.nanoframework.concurrent.scheduler.cluster.consts.Keys;
import org.nanoframework.concurrent.scheduler.cluster.exception.SchedulerRegistryException;
import org.nanoframework.concurrent.scheduler.cluster.storage.NodeStatusSyncScheduler;
import org.nanoframework.concurrent.scheduler.cluster.storage.listener.SchedulerListener;
import org.nanoframework.concurrent.scheduler.loader.SchedulerLoader;
import org.nanoframework.core.component.scan.ClassScanner;
import org.nanoframework.core.globals.Globals;

@Singleton
/* loaded from: input_file:org/nanoframework/concurrent/scheduler/cluster/loader/ClusterSchedulerLoader.class */
public class ClusterSchedulerLoader implements SchedulerLoader, Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchedulerLoader.class);
    private final AtomicBoolean loaded = new AtomicBoolean(false);
    private String clusterId;
    private String nodeId;
    private Configure configure;
    private KeyValueClient kvClient;
    private SchedulerListener listener;
    private KVCache cache;
    private NodeStatusSyncScheduler nodeStatusSync;

    public void load() {
        if (Boolean.parseBoolean(System.getProperty(Keys.CLUSTER_SCHEDULER_ENABLED, "false"))) {
            init();
            if (valid()) {
                scan();
                registry(filterSchedulerClusterClasses());
                listener();
                sync();
                this.loaded.set(true);
            }
        }
    }

    protected void init() {
        Injector injector = (Injector) Globals.get(Injector.class);
        this.clusterId = (String) injector.getInstance(Key.get(String.class, Names.named(Keys.CLUSTER_ID)));
        this.nodeId = (String) injector.getInstance(Key.get(String.class, Names.named(Keys.NODE_ID)));
        this.configure = (Configure) injector.getInstance(Configure.class);
        this.kvClient = (KeyValueClient) injector.getInstance(Key.get(KeyValueClient.class, Names.named(ConsulSources.KV_SCHEDULER_CLUSTER)));
        this.listener = (SchedulerListener) injector.getInstance(SchedulerListener.class);
        this.nodeStatusSync = (NodeStatusSyncScheduler) injector.getInstance(NodeStatusSyncScheduler.class);
    }

    protected boolean valid() {
        if (this.loaded.get()) {
            LOGGER.warn("已经加载Cluster任务.");
            return false;
        }
        if (PropertiesLoader.PROPERTIES.size() != 0) {
            return true;
        }
        LOGGER.warn("无任务调度配置.");
        return false;
    }

    protected void scan() {
        PropertiesLoader.PROPERTIES.values().stream().filter(properties -> {
            return properties.get(Keys.BASE_PACKAGE) != null;
        }).forEach(properties2 -> {
            ClassScanner.scan(properties2.getProperty(Keys.BASE_PACKAGE));
        });
    }

    protected Set<Class<? extends BaseClusterScheduler>> filterSchedulerClusterClasses() {
        Set filter = ClassScanner.filter(ClusterScheduler.class);
        if (CollectionUtils.isEmpty(filter)) {
            return Collections.emptySet();
        }
        HashSet newHashSet = Sets.newHashSet();
        filter.stream().filter(cls -> {
            return BaseClusterScheduler.class.isAssignableFrom(cls);
        }).forEach(cls2 -> {
            newHashSet.add(ReflectUtils.convert(cls2));
        });
        LOGGER.info("Scheduler Cluster size: {}", new Object[]{Integer.valueOf(newHashSet.size())});
        return newHashSet;
    }

    protected void registry(Set<Class<? extends BaseClusterScheduler>> set) {
        Injector injector = (Injector) Globals.get(Injector.class);
        this.configure.setClusterId(this.clusterId);
        Node node = (Node) injector.getInstance(Node.class);
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            node.setId(MessageFormat.format(this.nodeId, hostAddress));
            node.setHost(hostAddress);
            node.setStatus(NodeStatus.LOOKING);
            node.setUptime(Long.valueOf(System.currentTimeMillis()));
            node.setLivetime(node.getUptime());
            node.setSchedulerAbility(set);
            this.configure.setCurrentNode(node);
            this.configure.addNode(node.getId(), node);
        } catch (UnknownHostException e) {
            throw new SchedulerRegistryException(e.getMessage(), e);
        }
    }

    protected void listener() {
        this.cache = KVCache.newCache(this.kvClient, this.clusterId);
        this.cache.addListener(this.listener);
        try {
            this.cache.start();
        } catch (Throwable th) {
            throw new ConsulException(th.getMessage(), th);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.nodeStatusSync.close();
        this.cache.removeListener(this.listener);
        try {
            this.cache.stop();
        } catch (Exception e) {
            LOGGER.error("停止KVCache异常, {}", new Object[]{e.getMessage()});
        }
    }

    protected void sync() {
        this.nodeStatusSync.start();
    }
}
