package io.atomix.cluster.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.atomix.cluster.BootstrapMetadataService;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterService;
import io.atomix.cluster.CoreMetadataService;
import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId;
import io.atomix.cluster.impl.DefaultCoreMetadataService;
import io.atomix.messaging.BroadcastService;
import io.atomix.messaging.MessagingService;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.event.AbstractListenerManager;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/cluster/impl/DefaultClusterService.class */
public class DefaultClusterService extends AbstractListenerManager<ClusterEvent, ClusterEventListener> implements ManagedClusterService {
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
    private static final long DEFAULT_FAILURE_TIME = 10000;
    private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
    private final MessagingService messagingService;
    private final BroadcastService broadcastService;
    private final BootstrapMetadataService bootstrapMetadataService;
    private final CoreMetadataService coreMetadataService;
    private final StatefulNode localNode;
    private ScheduledFuture<?> heartbeatFuture;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusterService.class);
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder().register(KryoNamespaces.BASIC).nextId(500).register(new Class[]{NodeId.class}).register(new Class[]{Node.Type.class}).register(new Class[]{Node.State.class}).register(new Class[]{ClusterHeartbeat.class}).register(new Class[]{StatefulNode.class}).register(new DefaultCoreMetadataService.AddressSerializer(), new Class[]{Address.class}).build("ClusterService"));
    private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
    private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
    private final AtomicBoolean started = new AtomicBoolean();
    private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap();
    private final Map<NodeId, PhiAccrualFailureDetector> failureDetectors = Maps.newConcurrentMap();
    private final ClusterMetadataEventListener metadataEventListener = this::handleMetadataEvent;
    private final Consumer<byte[]> broadcastListener = this::handleBroadcastMessage;
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(Threads.namedThreads("atomix-cluster-heartbeat-sender", LOGGER));
    private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(Threads.namedThreads("atomix-cluster-heartbeat-receiver", LOGGER));

    public DefaultClusterService(Node node, BootstrapMetadataService bootstrapMetadataService, CoreMetadataService coreMetadataService, MessagingService messagingService, BroadcastService broadcastService) {
        this.bootstrapMetadataService = (BootstrapMetadataService) Preconditions.checkNotNull(bootstrapMetadataService, "bootstrapMetadataService cannot be null");
        this.coreMetadataService = (CoreMetadataService) Preconditions.checkNotNull(coreMetadataService, "coreMetadataService cannot be null");
        this.messagingService = (MessagingService) Preconditions.checkNotNull(messagingService, "messagingService cannot be null");
        this.broadcastService = (BroadcastService) Preconditions.checkNotNull(broadcastService, "broadcastService cannot be null");
        this.localNode = new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host());
    }

    @Override // io.atomix.cluster.ClusterService
    public Node getLocalNode() {
        return this.localNode;
    }

    @Override // io.atomix.cluster.ClusterService
    public Set<Node> getNodes() {
        return ImmutableSet.copyOf((Collection) this.nodes.values().stream().filter(statefulNode -> {
            return statefulNode.type() == Node.Type.CORE || statefulNode.getState() == Node.State.ACTIVE;
        }).collect(Collectors.toList()));
    }

    @Override // io.atomix.cluster.ClusterService
    public Node getNode(NodeId nodeId) {
        StatefulNode statefulNode = this.nodes.get(nodeId);
        if (statefulNode == null || !(statefulNode.type() == Node.Type.CORE || statefulNode.getState() == Node.State.ACTIVE)) {
            return null;
        }
        return statefulNode;
    }

    private void broadcastIdentity() {
        this.broadcastService.broadcast(SERIALIZER.encode(this.localNode));
    }

    private void handleBroadcastMessage(byte[] bArr) {
        StatefulNode statefulNode = (StatefulNode) SERIALIZER.decode(bArr);
        if (this.nodes.putIfAbsent(statefulNode.id(), statefulNode) == null) {
            post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
            post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
            sendHeartbeats();
        }
    }

    private CompletableFuture<Void> sendHeartbeats() {
        Stream<StatefulNode> filter = this.nodes.values().stream().filter(statefulNode -> {
            return !statefulNode.id().equals(getLocalNode().id());
        });
        Stream<R> map = this.bootstrapMetadataService.getMetadata().nodes().stream().filter(node -> {
            return (node.id().equals(getLocalNode().id()) || this.nodes.containsKey(node.id())) ? false : true;
        }).map(node2 -> {
            return new StatefulNode(node2.id(), node2.type(), node2.address(), node2.zone(), node2.rack(), node2.host());
        });
        byte[] encode = SERIALIZER.encode(new ClusterHeartbeat(this.localNode.id(), this.localNode.type(), this.localNode.zone(), this.localNode.rack(), this.localNode.host()));
        return Futures.allOf((List) Stream.concat(filter, map).map(statefulNode2 -> {
            LOGGER.trace("{} - Sending heartbeat: {}", this.localNode.id(), statefulNode2.id());
            CompletableFuture<Void> sendHeartbeat = sendHeartbeat(statefulNode2.address(), encode);
            PhiAccrualFailureDetector computeIfAbsent = this.failureDetectors.computeIfAbsent(statefulNode2.id(), nodeId -> {
                return new PhiAccrualFailureDetector();
            });
            double phi = computeIfAbsent.phi();
            if (phi >= this.phiFailureThreshold || (phi == 0.0d && computeIfAbsent.lastUpdated() > 0 && System.currentTimeMillis() - computeIfAbsent.lastUpdated() > DEFAULT_FAILURE_TIME)) {
                if (statefulNode2.getState() == Node.State.ACTIVE) {
                    deactivateNode(statefulNode2);
                }
            } else if (statefulNode2.getState() == Node.State.INACTIVE) {
                activateNode(statefulNode2);
            }
            return sendHeartbeat.exceptionally(th -> {
                return null;
            });
        }).collect(Collectors.toList())).thenApply(list -> {
            return null;
        });
    }

    private CompletableFuture<Void> sendHeartbeat(Address address, byte[] bArr) {
        return this.messagingService.sendAndReceive(address, HEARTBEAT_MESSAGE, bArr).whenComplete((bArr2, th) -> {
            if (th != null) {
                LOGGER.debug("{} - Sending heartbeat to {} failed", new Object[]{this.localNode.id(), address, th});
                return;
            }
            boolean z = false;
            for (StatefulNode statefulNode : (Collection) SERIALIZER.decode(bArr2)) {
                if (this.nodes.putIfAbsent(statefulNode.id(), statefulNode) == null) {
                    post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
                    post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
                    z = true;
                }
            }
            if (z) {
                sendHeartbeats();
            }
        }).exceptionally(th2 -> {
            return null;
        }).thenApply(bArr3 -> {
            return null;
        });
    }

    private byte[] handleHeartbeat(Address address, byte[] bArr) {
        ClusterHeartbeat clusterHeartbeat = (ClusterHeartbeat) SERIALIZER.decode(bArr);
        LOGGER.trace("{} - Received heartbeat: {}", this.localNode.id(), clusterHeartbeat.nodeId());
        this.failureDetectors.computeIfAbsent(clusterHeartbeat.nodeId(), nodeId -> {
            return new PhiAccrualFailureDetector();
        }).report();
        activateNode(new StatefulNode(clusterHeartbeat.nodeId(), clusterHeartbeat.nodeType(), address, clusterHeartbeat.zone(), clusterHeartbeat.rack(), clusterHeartbeat.host()));
        return SERIALIZER.encode(this.nodes.values().stream().filter(statefulNode -> {
            return statefulNode.type() == Node.Type.CLIENT;
        }).collect(Collectors.toList()));
    }

    private void activateNode(Node node) {
        StatefulNode statefulNode = this.nodes.get(node.id());
        if (statefulNode != null) {
            if (statefulNode.getState() == Node.State.INACTIVE) {
                LOGGER.info("{} - Node activated: {}", this.localNode.id(), statefulNode);
                statefulNode.setState(Node.State.ACTIVE);
                post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode));
                return;
            }
            return;
        }
        StatefulNode statefulNode2 = new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host());
        LOGGER.info("{} - Node activated: {}", this.localNode.id(), statefulNode2);
        statefulNode2.setState(Node.State.ACTIVE);
        this.nodes.put(statefulNode2.id(), statefulNode2);
        post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode2));
        post(new ClusterEvent(ClusterEvent.Type.NODE_ACTIVATED, statefulNode2));
        sendHeartbeat(node.address(), SERIALIZER.encode(new ClusterHeartbeat(this.localNode.id(), this.localNode.type(), this.localNode.zone(), this.localNode.rack(), this.localNode.host())));
    }

    private void deactivateNode(Node node) {
        StatefulNode statefulNode = this.nodes.get(node.id());
        if (statefulNode == null || statefulNode.getState() != Node.State.ACTIVE) {
            return;
        }
        LOGGER.info("{} - Node deactivated: {}", this.localNode.id(), statefulNode);
        statefulNode.setState(Node.State.INACTIVE);
        switch (statefulNode.type()) {
            case CORE:
                post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, statefulNode));
                return;
            case DATA:
            case CLIENT:
                post(new ClusterEvent(ClusterEvent.Type.NODE_DEACTIVATED, statefulNode));
                post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, statefulNode));
                return;
            default:
                throw new AssertionError();
        }
    }

    private void handleMetadataEvent(ClusterMetadataEvent clusterMetadataEvent) {
        Iterator it = Sets.difference((Set) this.nodes.entrySet().stream().filter(entry -> {
            return ((StatefulNode) entry.getValue()).type() == Node.Type.CORE;
        }).map(entry2 -> {
            return (NodeId) entry2.getKey();
        }).collect(Collectors.toSet()), (Set) ((ClusterMetadata) clusterMetadataEvent.subject()).nodes().stream().map(node -> {
            if (this.nodes.get(node.id()) == null) {
                StatefulNode statefulNode = new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host());
                this.nodes.put(statefulNode.id(), statefulNode);
                post(new ClusterEvent(ClusterEvent.Type.NODE_ADDED, statefulNode));
            }
            return node.id();
        }).collect(Collectors.toSet())).iterator();
        while (it.hasNext()) {
            StatefulNode remove = this.nodes.remove((NodeId) it.next());
            if (remove != null) {
                post(new ClusterEvent(ClusterEvent.Type.NODE_REMOVED, remove));
            }
        }
    }

    public CompletableFuture<ClusterService> start() {
        if (!this.started.compareAndSet(false, true)) {
            return CompletableFuture.completedFuture(null);
        }
        this.coreMetadataService.addListener(this.metadataEventListener);
        this.broadcastService.addListener(this.broadcastListener);
        LOGGER.info("{} - Node activated: {}", this.localNode.id(), this.localNode);
        this.localNode.setState(Node.State.ACTIVE);
        this.nodes.put(this.localNode.id(), this.localNode);
        this.coreMetadataService.getMetadata().nodes().forEach(node -> {
            this.nodes.putIfAbsent(node.id(), new StatefulNode(node.id(), node.type(), node.address(), node.zone(), node.rack(), node.host()));
        });
        this.messagingService.registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, this.heartbeatExecutor);
        ComposableFuture composableFuture = new ComposableFuture();
        broadcastIdentity();
        sendHeartbeats().whenComplete((r4, th) -> {
            composableFuture.complete((Object) null);
        });
        this.heartbeatFuture = this.heartbeatScheduler.scheduleWithFixedDelay(() -> {
            broadcastIdentity();
            sendHeartbeats();
        }, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
        return composableFuture.thenApply(r42 -> {
            LOGGER.info("Started");
            return this;
        });
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatScheduler.shutdownNow();
            this.heartbeatExecutor.shutdownNow();
            LOGGER.info("{} - Node deactivated: {}", this.localNode.id(), this.localNode);
            this.localNode.setState(Node.State.INACTIVE);
            this.nodes.clear();
            this.heartbeatFuture.cancel(true);
            this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
            this.coreMetadataService.removeListener(this.metadataEventListener);
            LOGGER.info("Stopped");
        }
        return CompletableFuture.completedFuture(null);
    }
}
