package org.onosproject.store.cluster.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cfg.ConfigProperty;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/cluster/impl/DistributedClusterStore.class */
public class DistributedClusterStore extends AbstractStore<ClusterEvent, ClusterStoreDelegate> implements ClusterStore {
    public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
    private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
    private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
    private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
    private PhiAccrualFailureDetector failureDetector;
    private ControllerNode localNode;
    private Version localVersion;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected VersionService versionService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterMetadataService clusterMetadataService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MessagingService messagingService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, bind = "bindComponentConfigService", unbind = "unbindComponentConfigService", policy = ReferencePolicy.DYNAMIC)
    protected ComponentConfigService cfgService;
    private static final Logger log = LoggerFactory.getLogger(DistributedClusterStore.class);
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(500).register(new Class[]{HeartbeatMessage.class}).build("ClusterStore"));

    @Property(name = "heartbeatInterval", intValue = {DEFAULT_HEARTBEAT_INTERVAL}, label = "Interval time to send heartbeat to other controller nodes (millisecond)")
    private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;

    @Property(name = "phiFailureThreshold", intValue = {DEFAULT_PHI_FAILURE_THRESHOLD}, label = "the value of Phi threshold to detect accrual failure")
    private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
    private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
    private final Map<NodeId, ControllerNode.State> nodeStates = Maps.newConcurrentMap();
    private final Map<NodeId, Version> nodeVersions = Maps.newConcurrentMap();
    private final Map<NodeId, DateTime> nodeLastUpdatedTimes = Maps.newConcurrentMap();
    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/cluster/impl/DistributedClusterStore$HeartbeatMessage.class */
    public static class HeartbeatMessage {
        private ControllerNode source;
        private ControllerNode.State state;
        private Version version;

        public HeartbeatMessage(ControllerNode controllerNode, ControllerNode.State state, Version version) {
            this.source = controllerNode;
            this.state = state != null ? state : ControllerNode.State.ACTIVE;
            this.version = version;
        }

        public ControllerNode source() {
            return this.source;
        }
    }

    /* loaded from: input_file:org/onosproject/store/cluster/impl/DistributedClusterStore$HeartbeatMessageHandler.class */
    private class HeartbeatMessageHandler implements BiConsumer<Endpoint, byte[]> {
        private HeartbeatMessageHandler() {
        }

        @Override // java.util.function.BiConsumer
        public void accept(Endpoint endpoint, byte[] bArr) {
            HeartbeatMessage heartbeatMessage = (HeartbeatMessage) DistributedClusterStore.SERIALIZER.decode(bArr);
            if (DistributedClusterStore.this.clusterMetadataService.getClusterMetadata().getNodes().contains(heartbeatMessage.source())) {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - DistributedClusterStore.this.failureDetector.getLastHeartbeatTime(heartbeatMessage.source().id()) > DistributedClusterStore.this.heartbeatInterval / 2) {
                    DistributedClusterStore.this.failureDetector.report(heartbeatMessage.source().id(), currentTimeMillis);
                }
                DistributedClusterStore.this.updateNode(heartbeatMessage.source().id(), heartbeatMessage.state, heartbeatMessage.version);
            }
        }
    }

    protected void bindComponentConfigService(ComponentConfigService componentConfigService) {
        if (this.cfgService == null) {
            this.cfgService = componentConfigService;
            this.cfgService.registerProperties(getClass());
            readComponentConfiguration();
        }
    }

    protected void unbindComponentConfigService(ComponentConfigService componentConfigService) {
        if (this.cfgService == componentConfigService) {
            this.cfgService.unregisterProperties(getClass(), false);
            this.cfgService = null;
        }
    }

    @Activate
    public void activate() {
        this.localNode = this.clusterMetadataService.getLocalNode();
        this.localVersion = this.versionService.version();
        this.nodeVersions.put(this.localNode.id(), this.localVersion);
        this.messagingService.registerHandler(HEARTBEAT_MESSAGE, new HeartbeatMessageHandler(), this.heartBeatMessageHandler);
        this.failureDetector = new PhiAccrualFailureDetector();
        this.heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
        this.heartBeatSender.shutdownNow();
        this.heartBeatMessageHandler.shutdownNow();
        log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext componentContext) {
        readComponentConfiguration();
    }

    public void setDelegate(ClusterStoreDelegate clusterStoreDelegate) {
        Preconditions.checkNotNull(clusterStoreDelegate, "Delegate cannot be null");
        this.delegate = clusterStoreDelegate;
    }

    public void unsetDelegate(ClusterStoreDelegate clusterStoreDelegate) {
        this.delegate = null;
    }

    public boolean hasDelegate() {
        return this.delegate != null;
    }

    public ControllerNode getLocalNode() {
        return this.localNode;
    }

    public Set<ControllerNode> getNodes() {
        return ImmutableSet.copyOf(this.allNodes.values());
    }

    public ControllerNode getNode(NodeId nodeId) {
        Preconditions.checkNotNull(nodeId, INSTANCE_ID_NULL);
        return this.allNodes.get(nodeId);
    }

    public ControllerNode.State getState(NodeId nodeId) {
        Preconditions.checkNotNull(nodeId, INSTANCE_ID_NULL);
        return (ControllerNode.State) MoreObjects.firstNonNull(this.nodeStates.get(nodeId), ControllerNode.State.INACTIVE);
    }

    public Version getVersion(NodeId nodeId) {
        Preconditions.checkNotNull(nodeId, INSTANCE_ID_NULL);
        return this.nodeVersions.get(nodeId);
    }

    public void markFullyStarted(boolean z) {
        updateNode(this.localNode.id(), z ? ControllerNode.State.READY : ControllerNode.State.ACTIVE, null);
    }

    public ControllerNode addNode(NodeId nodeId, IpAddress ipAddress, int i) {
        Preconditions.checkNotNull(nodeId, INSTANCE_ID_NULL);
        DefaultControllerNode defaultControllerNode = new DefaultControllerNode(nodeId, ipAddress, i);
        addNode(defaultControllerNode);
        return defaultControllerNode;
    }

    public void removeNode(NodeId nodeId) {
        Preconditions.checkNotNull(nodeId, INSTANCE_ID_NULL);
        ControllerNode remove = this.allNodes.remove(nodeId);
        if (remove != null) {
            this.nodeStates.remove(nodeId);
            this.nodeVersions.remove(nodeId);
            notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, remove));
        }
    }

    private void addNode(ControllerNode controllerNode) {
        this.allNodes.put(controllerNode.id(), controllerNode);
        updateNode(controllerNode.id(), controllerNode.equals(this.localNode) ? ControllerNode.State.ACTIVE : ControllerNode.State.INACTIVE, null);
        notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, controllerNode));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateNode(NodeId nodeId, ControllerNode.State state, Version version) {
        ControllerNode.State state2 = this.nodeStates.get(nodeId);
        Version version2 = this.nodeVersions.get(nodeId);
        if (Objects.equals(state2, state) && (version == null || Objects.equals(version2, version))) {
            return;
        }
        this.nodeStates.put(nodeId, state);
        if (version != null) {
            this.nodeVersions.put(nodeId, version);
        }
        this.nodeLastUpdatedTimes.put(nodeId, DateTime.now());
        notifyChange(nodeId, state2, state, version2, version);
    }

    private void heartbeat() {
        try {
            Set set = (Set) this.allNodes.values().stream().filter(controllerNode -> {
                return !controllerNode.id().equals(this.localNode.id());
            }).collect(Collectors.toSet());
            byte[] encode = SERIALIZER.encode(new HeartbeatMessage(this.localNode, this.nodeStates.get(this.localNode.id()), this.localVersion));
            set.forEach(controllerNode2 -> {
                heartbeatToPeer(encode, controllerNode2);
                ControllerNode.State state = this.nodeStates.get(controllerNode2.id());
                if (this.failureDetector.phi(controllerNode2.id()) < this.phiFailureThreshold) {
                    if (state == ControllerNode.State.INACTIVE) {
                        updateNode(controllerNode2.id(), ControllerNode.State.ACTIVE, null);
                    }
                } else if (state.isActive()) {
                    updateNode(controllerNode2.id(), ControllerNode.State.INACTIVE, null);
                    this.failureDetector.reset(controllerNode2.id());
                }
            });
        } catch (Exception e) {
            log.debug("Failed to send heartbeat", e);
        }
    }

    private void notifyChange(NodeId nodeId, ControllerNode.State state, ControllerNode.State state2, Version version, Version version2) {
        if (state == state2 && Objects.equals(version, version2)) {
            return;
        }
        ControllerNode controllerNode = this.allNodes.get(nodeId);
        if (controllerNode == null) {
            log.debug("Could not find node {} in the cluster, ignoring state change", nodeId);
        } else {
            notifyDelegate(new ClusterEvent(state2 == ControllerNode.State.READY ? ClusterEvent.Type.INSTANCE_READY : state2 == ControllerNode.State.ACTIVE ? ClusterEvent.Type.INSTANCE_ACTIVATED : ClusterEvent.Type.INSTANCE_DEACTIVATED, controllerNode));
        }
    }

    private void heartbeatToPeer(byte[] bArr, ControllerNode controllerNode) {
        Endpoint endpoint = new Endpoint(controllerNode.ip(), controllerNode.tcpPort());
        this.messagingService.sendAsync(endpoint, HEARTBEAT_MESSAGE, bArr).whenComplete((r6, th) -> {
            if (th != null) {
                log.trace("Sending heartbeat to {} failed", endpoint, th);
            }
        });
    }

    public DateTime getLastUpdated(NodeId nodeId) {
        return this.nodeLastUpdatedTimes.get(nodeId);
    }

    private void readComponentConfiguration() {
        for (ConfigProperty configProperty : this.cfgService.getProperties(getClass().getName())) {
            if ("heartbeatInterval".equals(configProperty.name())) {
                String value = configProperty.value();
                if (value == null) {
                    setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
                    log.info("Heartbeat interval time is not configured, default value is {}", Integer.valueOf(DEFAULT_HEARTBEAT_INTERVAL));
                } else {
                    int parseInt = Strings.isNullOrEmpty(value) ? DEFAULT_HEARTBEAT_INTERVAL : Integer.parseInt(value.trim());
                    if (parseInt > 0 && this.heartbeatInterval != parseInt) {
                        this.heartbeatInterval = parseInt;
                        restartHeartbeatSender();
                    }
                    log.info("Configured. Heartbeat interval time is configured to {}", Integer.valueOf(this.heartbeatInterval));
                }
            }
            if ("phiFailureThreshold".equals(configProperty.name())) {
                String value2 = configProperty.value();
                if (value2 == null) {
                    setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
                    log.info("Phi failure threshold is not configured, default value is {}", Integer.valueOf(DEFAULT_PHI_FAILURE_THRESHOLD));
                } else {
                    setPhiFailureThreshold(Strings.isNullOrEmpty(value2) ? DEFAULT_HEARTBEAT_INTERVAL : Integer.parseInt(value2.trim()));
                    log.info("Configured. Phi failure threshold is configured to {}", Integer.valueOf(this.phiFailureThreshold));
                }
            }
        }
    }

    private void setHeartbeatInterval(int i) {
        try {
            Preconditions.checkArgument(i > 0, "Interval must be greater than zero");
            this.heartbeatInterval = i;
        } catch (IllegalArgumentException e) {
            log.warn(e.getMessage());
            this.heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
        }
    }

    private void setPhiFailureThreshold(int i) {
        this.phiFailureThreshold = i;
    }

    private void restartHeartbeatSender() {
        try {
            ScheduledExecutorService scheduledExecutorService = this.heartBeatSender;
            this.heartBeatSender = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
            this.heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0L, this.heartbeatInterval, TimeUnit.MILLISECONDS);
            scheduledExecutorService.shutdown();
        } catch (Exception e) {
            log.warn(e.getMessage());
        }
    }

    protected void bindVersionService(VersionService versionService) {
        this.versionService = versionService;
    }

    protected void unbindVersionService(VersionService versionService) {
        if (this.versionService == versionService) {
            this.versionService = null;
        }
    }

    protected void bindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        this.clusterMetadataService = clusterMetadataService;
    }

    protected void unbindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        if (this.clusterMetadataService == clusterMetadataService) {
            this.clusterMetadataService = null;
        }
    }

    protected void bindMessagingService(MessagingService messagingService) {
        this.messagingService = messagingService;
    }

    protected void unbindMessagingService(MessagingService messagingService) {
        if (this.messagingService == messagingService) {
            this.messagingService = null;
        }
    }
}
