package work.ready.cloud.cluster;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import work.ready.cloud.ReadyCloud;
import work.ready.cloud.cluster.Cloud;
import work.ready.cloud.cluster.CloudDb;
import work.ready.cloud.cluster.elasticsearch.ReadyElasticSearchFactory;
import work.ready.cloud.cluster.elasticsearch.Version;
import work.ready.cloud.cluster.elasticsearch.artifact.ArchiveArtifact;
import work.ready.cloud.registry.base.URL;
import work.ready.core.event.GeneralEvent;
import work.ready.core.event.cloud.Event;
import work.ready.core.log.Log;
import work.ready.core.log.LogFactory;
import work.ready.core.server.Constant;
import work.ready.core.server.Ready;
import work.ready.core.tools.StrUtil;
import work.ready.core.tools.define.io.FileSystemResource;

/* loaded from: input_file:work/ready/cloud/cluster/EventHandler.class */
public class EventHandler {
    private static final Log logger = LogFactory.getLog(EventHandler.class);
    private DelayQueue<DelayedTask> eventQueue = new DelayQueue<>();
    private DelayQueue<DelayedTask> messageQueue = new DelayQueue<>();
    private List<Integer> nodeLeftEvent = new ArrayList(Arrays.asList(11, 12, 14, 100));
    private List<Integer> nodeJoinEvent = new ArrayList(Arrays.asList(10, 17));

    private void eventHandledListener(Cloud cloud) {
        Cloud.message(Cloud.cluster().forYoungest()).localListen("EventHandled", new IgniteBiPredicate<UUID, String>() { // from class: work.ready.cloud.cluster.EventHandler.1
            public boolean apply(UUID uuid, String str) {
                EventHandler.logger.info("Youngest node received message [msg=" + str.replace("%", "%%") + ", from Oldest=" + uuid.toString().replace("%", "%%") + "]", new Object[0]);
                Ready.post(new GeneralEvent(Event.MESSAGE_FROM_OLDEST).put("EventHandled", new DelayedTask(str, 60000L)));
                return true;
            }
        });
    }

    public void listen(final Cloud cloud) {
        final ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new CloudThreadFactory());
        Ready.shutdownHook.add(6, j -> {
            newCachedThreadPool.shutdown();
        });
        Ready.eventManager().addListener(this, "messageFromOldest", listenerSetter -> {
            listenerSetter.addName(Event.MESSAGE_FROM_OLDEST).setAsync(true);
        });
        Ready.eventManager().addListener(this, "elasticSearchInit", listenerSetter2 -> {
            listenerSetter2.addName(Event.READY_WORK_CLOUD_AFTER_INIT).setAsync(false);
        });
        eventHandledListener(cloud);
        Cloud.events(Cloud.cluster().forServers()).localListen(new IgnitePredicate<org.apache.ignite.events.Event>() { // from class: work.ready.cloud.cluster.EventHandler.2
            public boolean apply(org.apache.ignite.events.Event event) {
                if (EventHandler.logger.isDebugEnabled()) {
                    EventHandler.logger.debug("Received node event: " + event.toString().replace("%", "%%"), new Object[0]);
                }
                ExecutorService executorService = newCachedThreadPool;
                Cloud cloud2 = cloud;
                executorService.submit(() -> {
                    if (EventHandler.this.nodeLeftEvent.contains(Integer.valueOf(event.type()))) {
                        EventHandler.this.handleNodeLeft(cloud2, event);
                    } else if (EventHandler.this.nodeJoinEvent.contains(Integer.valueOf(event.type()))) {
                        EventHandler.this.handleNodeJoin(cloud2, event);
                    }
                });
                return true;
            }
        }, new int[]{10, 11, 12, 14, 16, 17, 100});
        if (!Cloud.cluster().state().active()) {
            cloudActivate(cloud);
        }
        afterCloudActive(cloud);
        eventsHandleEnsure();
    }

    private void cloudActivate(Cloud cloud) {
        boolean active;
        String str = null;
        boolean z = false;
        if (ReadyCloud.getNodeType().equals(Cloud.NodeType.APPLICATION_WITH_OLTP)) {
            str = Ready.getProperty(Cloud.DATA_OLTP_NODES_PROPERTY);
            if (str == null) {
                str = ReadyCloud.getConfig().getDataOltpNodes();
            }
        }
        ArrayList arrayList = StrUtil.isBlank(str) ? null : new ArrayList(Arrays.asList(StrUtil.split(str, ",")));
        if (arrayList != null) {
            String property = Ready.getProperty(Cloud.DATA_OLTP_FORCE_ACTIVE_PROPERTY);
            if (property != null && "true".equals(property.toLowerCase())) {
                z = true;
            }
            if (logger.isWarnEnabled()) {
                logger.warn("Baseline Topology is waiting for other nodes to join: ", new Object[0]);
            }
        } else if (logger.isWarnEnabled()) {
            logger.warn("Baseline Topology is waiting for activation.", new Object[0]);
        }
        try {
            StringBuilder sb = new StringBuilder();
            do {
                active = Cloud.cluster().state().active();
                if (!active) {
                    List list = (List) Cloud.cluster().nodes().stream().map(clusterNode -> {
                        return clusterNode.consistentId().toString();
                    }).collect(Collectors.toList());
                    sb.delete(0, sb.length());
                    sb.append("Current online nodes: ");
                    sb.append(Cloud.cluster().nodes().stream().map(clusterNode2 -> {
                        return clusterNode2.consistentId() + clusterNode2.addresses().toString().replace("%", "%%");
                    }).collect(Collectors.toList()));
                    if (Cloud.cluster().currentBaselineTopology() != null && !z) {
                        List list2 = (List) Cloud.cluster().currentBaselineTopology().stream().map(baselineNode -> {
                            return baselineNode.consistentId().toString();
                        }).collect(Collectors.toList());
                        sb.append(", waiting for nodes: ");
                        sb.append(list2.stream().filter(str2 -> {
                            return !list.contains(str2);
                        }).collect(Collectors.toList()));
                    } else if (arrayList == null) {
                        sb.append(", waiting for other nodes.");
                    } else if (list.containsAll(arrayList)) {
                        for (ClusterNode clusterNode3 : (List) Cloud.cluster().nodes().stream().filter(clusterNode4 -> {
                            return arrayList.contains(clusterNode4.consistentId().toString());
                        }).collect(Collectors.toList())) {
                            if (clusterNode3.attribute(Cloud.READY_CLOUD_DATA_OLTP_NODE) == null) {
                                cloud.shutdownAllNodes();
                                throw new RuntimeException(clusterNode3.consistentId() + clusterNode3.addresses() + " is not a valid DATA_OLTP node");
                            }
                        }
                        System.err.println("==active==");
                        sb.append(", all nodes are online, the Baseline Topology is going to be active.");
                        Cloud.cluster().active(true);
                        if (Cloud.cluster().currentBaselineTopology() == null) {
                        }
                    } else {
                        sb.append(", waiting for nodes: ");
                        sb.append(arrayList.stream().filter(str3 -> {
                            return !list.contains(str3);
                        }).collect(Collectors.toList()));
                    }
                    logger.warn(sb.toString(), new Object[0]);
                    Thread.sleep(1000L);
                }
            } while (!active);
            if (logger.isWarnEnabled()) {
                logger.warn("Cloud Baseline Topology is active !", new Object[0]);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("InterruptedException while waiting for nodes to join the cluster.");
        }
    }

    private void afterCloudActive(Cloud cloud) {
        if (Cloud.cluster().currentBaselineTopology() != null) {
            Cloud.cluster().currentBaselineTopology().stream().filter(baselineNode -> {
                return baselineNode.attribute(Cloud.READY_CLOUD_DATA_OLTP_NODE) != null;
            }).findFirst().ifPresent(baselineNode2 -> {
                if (baselineNode2.consistentId().equals(Cloud.cluster().localNode().consistentId())) {
                    Cloud.cluster().baselineAutoAdjustEnabled(ReadyCloud.getConfig().isAutoRebalance());
                    Cloud.cluster().baselineAutoAdjustTimeout(ReadyCloud.getConfig().getRebalanceTimeOut());
                    Cloud.addCacheConfiguration(cloud.newCacheConfig(CloudDb.TableMode.REPLICATED_PERSISTENCE.name(), false, true, 0, true).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT));
                    Cloud.addCacheConfiguration(cloud.newCacheConfig(CloudDb.TableMode.PARTITIONED_PERSISTENCE.name(), true, true, 1, true).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT));
                }
            });
        }
        if (Cloud.cluster().forOldest().node().equals(Cloud.cluster().localNode())) {
            Cloud.addCacheConfiguration(cloud.newCacheConfig(CloudDb.TableMode.REPLICATED_MEMORY.name(), false, true, 0, false).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT));
            Cloud.addCacheConfiguration(cloud.newCacheConfig(CloudDb.TableMode.PARTITIONED_MEMORY.name(), true, true, 1, false).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT));
        }
        if (!ReadyCloud.getConfig().isDataNodeAutoJoin() || Cloud.cluster().currentBaselineTopology() == null || Cloud.cluster().localNode().attribute(Cloud.READY_CLOUD_DATA_OLTP_NODE) == null || !Cloud.cluster().currentBaselineTopology().stream().map((v0) -> {
            return v0.consistentId();
        }).noneMatch(obj -> {
            return obj.equals(Cloud.cluster().localNode().consistentId());
        })) {
            return;
        }
        Cloud.cluster().currentBaselineTopology().add(Cloud.cluster().localNode());
        if (logger.isWarnEnabled()) {
            logger.warn(Cloud.cluster().localNode().consistentId() + Cloud.cluster().localNode().addresses() + " is automatically joined Baseline Topology.", new Object[0]);
        }
        if (Cloud.cluster().isBaselineAutoAdjustEnabled()) {
            if (logger.isWarnEnabled()) {
                logger.warn(Cloud.cluster().localNode().consistentId() + Cloud.cluster().localNode().addresses() + " is going to handle data in " + (ReadyCloud.getConfig().getRebalanceTimeOut() / 1000) + " seconds.", new Object[0]);
            }
        } else if (logger.isWarnEnabled()) {
            logger.warn(Cloud.cluster().localNode().consistentId() + Cloud.cluster().localNode().addresses() + " joined Baseline Topology, but not going to store persistence data until rebalanced.", new Object[0]);
        }
    }

    public void messageFromOldest(GeneralEvent generalEvent) {
        this.messageQueue.add((DelayQueue<DelayedTask>) generalEvent.get("EventHandled"));
    }

    public void elasticSearchInit(GeneralEvent generalEvent) {
        if (Cloud.cluster().currentBaselineTopology() == null || !ReadyCloud.getNodeType().equals(Cloud.NodeType.APPLICATION_WITH_OLAP)) {
            return;
        }
        String property = Ready.getProperty(Cloud.DATA_OLAP_NODES_PROPERTY);
        if (property == null) {
            property = ReadyCloud.getConfig().getDataOlapNodes();
        }
        ArrayList arrayList = StrUtil.isBlank(property) ? null : new ArrayList(Arrays.asList(StrUtil.split(property, ",")));
        final ReadyElasticSearchFactory readyElasticSearchFactory = new ReadyElasticSearchFactory();
        if (ReadyCloud.getConfig().getElasticSearchSettings() != null) {
            readyElasticSearchFactory.setConfigProperties(ReadyCloud.getConfig().getElasticSearchSettings());
        }
        readyElasticSearchFactory.setWorkingDirectory(Ready.root().resolve("elasticsearch").resolve("workspace").toAbsolutePath());
        Path absolutePath = Ready.root().resolve("elasticsearch").resolve(Cloud.getConsistentId()).resolve("data").toAbsolutePath();
        Path absolutePath2 = Ready.root().resolve("elasticsearch").resolve(Cloud.getConsistentId()).resolve("logs").toAbsolutePath();
        readyElasticSearchFactory.setConfigProperty("path.data", absolutePath.toString());
        readyElasticSearchFactory.setConfigProperty("path.logs", absolutePath2.toString());
        readyElasticSearchFactory.setClusterName(ReadyCloud.getConfig().getOlapClusterName());
        readyElasticSearchFactory.setDefaultVersion(ReadyCloud.getConfig().getElasticSearchVersion());
        readyElasticSearchFactory.setDownloadUrls(ReadyCloud.getConfig().getElasticSearchDownloadUrls());
        readyElasticSearchFactory.setName(Cloud.getConsistentId());
        readyElasticSearchFactory.setAddress(Cloud.getBindIp());
        String publishIp = Cloud.getPublishIp();
        if (publishIp != null) {
            readyElasticSearchFactory.setPublishAddress(publishIp);
        }
        readyElasticSearchFactory.setInitialMasterNodes(arrayList);
        List<URL> discoverAll = Cloud.discoverAll(Cloud.NodeType.APPLICATION_WITH_OLAP.getType(), Constant.PROTOCOL_DEFAULT, Cloud.OLAP_SERVICE_ID, Ready.getBootstrapConfig().getActiveProfile());
        if (discoverAll == null || discoverAll.size() <= 0) {
            readyElasticSearchFactory.setInitialMasterNodes(readyElasticSearchFactory.getName());
        } else {
            readyElasticSearchFactory.setSeedHosts((List<String>) discoverAll.stream().filter(url -> {
                return url.getParameters().get("clusterName").equals(readyElasticSearchFactory.getClusterName());
            }).map(url2 -> {
                return url2.getParameters().get("address") + ":" + url2.getParameters().get("tcpPort");
            }).collect(Collectors.toList()));
        }
        if (Files.exists(Ready.config().getMainConfigPath(), new LinkOption[0])) {
            final String osType = ReadyElasticSearchFactory.getOsType();
            final Pattern compile = Pattern.compile("elasticsearch-([\\d\\.]+)-" + osType + "-x86_64.tar.gz");
            try {
                Files.walkFileTree(Ready.config().getMainConfigPath(), new SimpleFileVisitor<Path>() { // from class: work.ready.cloud.cluster.EventHandler.3
                    @Override // java.nio.file.SimpleFileVisitor, java.nio.file.FileVisitor
                    public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) {
                        Matcher matcher = compile.matcher(path.getFileName().toString());
                        if (matcher.matches()) {
                            boolean z = true;
                            if (readyElasticSearchFactory.getArtifact() != null && ((ArchiveArtifact) readyElasticSearchFactory.getArtifact()).getVersion().compareTo(Version.of(osType, matcher.group(1))) >= 0) {
                                z = false;
                            }
                            if (z) {
                                readyElasticSearchFactory.setArtifact(new ArchiveArtifact(Version.of(osType, matcher.group(1)), new FileSystemResource(path)));
                                if (matcher.group(1).equals(ReadyCloud.getConfig().getElasticSearchVersion())) {
                                    return FileVisitResult.TERMINATE;
                                }
                            }
                        }
                        return FileVisitResult.CONTINUE;
                    }
                });
            } catch (IOException e) {
            }
        }
        readyElasticSearchFactory.create().start();
    }

    private void eventsHandleEnsure() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    DelayedTask take = this.eventQueue.take();
                    if (this.messageQueue.contains(take)) {
                        this.messageQueue.remove(take);
                        if (logger.isInfoEnabled()) {
                            logger.info("Youngest node ensured that oldest node already handled event: " + take.getName().replace("%", "%%"), new Object[0]);
                        }
                    } else {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Oldest node seems missed event: " + take.getName().replace("%", "%%") + ", youngest node is trying to handle it.", new Object[0]);
                        }
                        DiscoveryEvent discoveryEvent = (DiscoveryEvent) take.getObject();
                        if (this.nodeLeftEvent.contains(Integer.valueOf(discoveryEvent.type()))) {
                            Ready.post(new GeneralEvent(Event.NODE_UNAVAILABLE).put("node", discoveryEvent.eventNode()));
                        }
                    }
                } catch (Exception e) {
                    logger.info(e, e.getMessage(), new Object[0]);
                }
            }
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    DelayedTask take = this.messageQueue.take();
                    if (logger.isWarnEnabled()) {
                        logger.warn("Received message from Oldest node: " + take.getName().replace("%", "%%") + " for insuring, but youngest node doesn't know such event", new Object[0]);
                    }
                } catch (Exception e) {
                    logger.info(e, e.getMessage(), new Object[0]);
                }
            }
        });
    }

    private void handleNodeJoin(Cloud cloud, org.apache.ignite.events.Event event) {
        if (Cloud.cluster().forOldest().node().isLocal()) {
            System.out.println("Oldest handles a node join event: " + event.toString().replace("%", "%%"));
        } else if (Cloud.cluster().forYoungest().node().isLocal()) {
            System.out.println("Youngest received a node join event: " + event.toString().replace("%", "%%"));
        }
    }

    private void handleNodeLeft(Cloud cloud, org.apache.ignite.events.Event event) {
        if (Cloud.cluster().forOldest().node().isLocal()) {
            if (logger.isInfoEnabled()) {
                logger.info("Oldest node handles a node left event: " + event.toString().replace("%", "%%"), new Object[0]);
            }
            if (event instanceof DiscoveryEvent) {
                DiscoveryEvent discoveryEvent = (DiscoveryEvent) event;
                Ready.post(new GeneralEvent(Event.NODE_UNAVAILABLE).put("node", discoveryEvent.eventNode()));
                if (Cloud.cluster().nodes().size() > 1) {
                    Cloud.message(Cloud.cluster().forYoungest()).send("EventHandled", discoveryEvent.shortDisplay());
                }
            }
            Cloud.getRegistry().startHeartbeat();
            return;
        }
        if (Cloud.cluster().forYoungest().node().isLocal()) {
            if (logger.isInfoEnabled()) {
                logger.info("Youngest node received a node left event: " + event.toString().replace("%", "%%"), new Object[0]);
            }
            if (event instanceof DiscoveryEvent) {
                DiscoveryEvent discoveryEvent2 = (DiscoveryEvent) event;
                this.eventQueue.add((DelayQueue<DelayedTask>) new DelayedTask(discoveryEvent2.shortDisplay(), discoveryEvent2, 5000L));
                if (logger.isInfoEnabled()) {
                    logger.info("Youngest node received the same event as the oldest node received: " + discoveryEvent2.eventNode().consistentId() + " left, try to ensure that the oldest node handles this event properly.", new Object[0]);
                }
            }
        }
    }
}
