package org.onosproject.store.consistent.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true, enabled = true)
/* loaded from: input_file:org/onosproject/store/consistent/impl/DistributedLeadershipManager.class */
public class DistributedLeadershipManager implements LeadershipService {

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected EventDeliveryService eventDispatcher;
    private ExecutorService messageHandlingExecutor;
    private ScheduledExecutorService retryLeaderLockExecutor;
    private ScheduledExecutorService deadLockDetectionExecutor;
    private ScheduledExecutorService leadershipStatusBroadcaster;
    private ConsistentMap<String, NodeId> lockMap;
    private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
    private NodeId localNodeId;
    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
    private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
    private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT = new MessageSubject("distributed-leadership-manager-events");
    private static final KryoSerializer SERIALIZER = new KryoSerializer() { // from class: org.onosproject.store.consistent.impl.DistributedLeadershipManager.1
        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).build().populate(1);
        }
    };
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
    private Set<String> activeTopics = Sets.newConcurrentHashSet();

    /* loaded from: input_file:org/onosproject/store/consistent/impl/DistributedLeadershipManager$InternalLeadershipEventListener.class */
    private class InternalLeadershipEventListener implements ClusterMessageHandler {
        private InternalLeadershipEventListener() {
        }

        public void handle(ClusterMessage clusterMessage) {
            LeadershipEvent leadershipEvent = (LeadershipEvent) DistributedLeadershipManager.SERIALIZER.decode(clusterMessage.payload());
            DistributedLeadershipManager.this.log.debug("Leadership Event: time = {} type = {} event = {}", new Object[]{Long.valueOf(leadershipEvent.time()), leadershipEvent.type(), leadershipEvent});
            Leadership leadership = (Leadership) leadershipEvent.subject();
            LeadershipEvent.Type type = leadershipEvent.type();
            String str = leadership.topic();
            boolean z = false;
            synchronized (DistributedLeadershipManager.this.leaderBoard) {
                Leadership leadership2 = (Leadership) DistributedLeadershipManager.this.leaderBoard.get(str);
                if (type.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
                    if (leadership2 == null || leadership2.epoch() < leadership.epoch()) {
                        DistributedLeadershipManager.this.leaderBoard.put(str, leadership);
                        z = true;
                    }
                } else {
                    if (!type.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
                        throw new IllegalStateException("Unknown event type.");
                    }
                    if (leadership2 != null && leadership2.epoch() == leadership.epoch()) {
                        DistributedLeadershipManager.this.leaderBoard.remove(str);
                        z = true;
                    }
                }
                if (z) {
                    DistributedLeadershipManager.this.eventDispatcher.post(leadershipEvent);
                }
            }
        }
    }

    @Activate
    public void activate() {
        this.lockMap = this.storageService.createConsistentMap("onos-leader-locks", new Serializer() { // from class: org.onosproject.store.consistent.impl.DistributedLeadershipManager.2
            KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();

            public <T> byte[] encode(T t) {
                return this.kryo.serialize(t);
            }

            public <T> T decode(byte[] bArr) {
                return (T) this.kryo.deserialize(bArr);
            }
        });
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.messageHandlingExecutor = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/store/leadership", "message-handler"));
        this.retryLeaderLockExecutor = Executors.newScheduledThreadPool(4, Tools.groupedThreads("onos/store/leadership", "election-thread-%d"));
        this.deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/store/leadership", "dead-lock-detector"));
        this.leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/store/leadership", "peer-updater"));
        this.clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener(), this.messageHandlingExecutor);
        this.deadLockDetectionExecutor.scheduleWithFixedDelay(this::purgeStaleLocks, 0L, 2L, TimeUnit.SECONDS);
        this.leadershipStatusBroadcaster.scheduleWithFixedDelay(this::sendLeadershipStatus, 0L, 2L, TimeUnit.SECONDS);
        this.listenerRegistry = new AbstractListenerRegistry<>();
        this.eventDispatcher.addSink(LeadershipEvent.class, this.listenerRegistry);
        this.log.info("Started.");
    }

    @Deactivate
    public void deactivate() {
        this.leaderBoard.forEach((str, leadership) -> {
            if (this.localNodeId.equals(leadership.leader())) {
                withdraw(str);
            }
        });
        this.eventDispatcher.removeSink(LeadershipEvent.class);
        this.clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.retryLeaderLockExecutor.shutdown();
        this.deadLockDetectionExecutor.shutdown();
        this.leadershipStatusBroadcaster.shutdown();
        this.log.info("Stopped.");
    }

    public Map<String, Leadership> getLeaderBoard() {
        return ImmutableMap.copyOf(this.leaderBoard);
    }

    public NodeId getLeader(String str) {
        Leadership leadership = this.leaderBoard.get(str);
        if (leadership != null) {
            return leadership.leader();
        }
        return null;
    }

    public Leadership getLeadership(String str) {
        Preconditions.checkArgument(str != null);
        return this.leaderBoard.get(str);
    }

    public Set<String> ownedTopics(NodeId nodeId) {
        Preconditions.checkArgument(nodeId != null);
        return (Set) this.leaderBoard.entrySet().stream().filter(entry -> {
            return nodeId.equals(((Leadership) entry.getValue()).leader());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    public void runForLeadership(String str) {
        this.log.debug("Running for leadership for topic: {}", str);
        this.activeTopics.add(str);
        tryLeaderLock(str);
    }

    public void withdraw(String str) {
        this.activeTopics.remove(str);
        try {
            Versioned versioned = this.lockMap.get(str);
            if (Objects.equals(versioned.value(), this.localNodeId) && this.lockMap.remove(str, versioned.version())) {
                this.log.info("Gave up leadership for {}", str);
                notifyRemovedLeader(str, this.localNodeId, versioned.version(), versioned.creationTime());
            }
        } catch (Exception e) {
            this.log.debug("Failed to verify (and clear) any lock this node might be holding for {}", str, e);
        }
    }

    public void addListener(LeadershipEventListener leadershipEventListener) {
        this.listenerRegistry.addListener(leadershipEventListener);
    }

    public void removeListener(LeadershipEventListener leadershipEventListener) {
        this.listenerRegistry.removeListener(leadershipEventListener);
    }

    private void tryLeaderLock(String str) {
        if (this.activeTopics.contains(str)) {
            try {
                Versioned versioned = this.lockMap.get(str);
                if (versioned != null) {
                    if (this.localNodeId.equals(versioned.value())) {
                        this.log.info("Already has leadership for {}", str);
                        notifyNewLeader(str, this.localNodeId, versioned.version(), versioned.creationTime());
                    } else {
                        retry(str);
                    }
                } else if (this.lockMap.putIfAbsent(str, this.localNodeId) == null) {
                    this.log.info("Assumed leadership for {}", str);
                    Versioned versioned2 = this.lockMap.get(str);
                    notifyNewLeader(str, this.localNodeId, versioned2.version(), versioned2.creationTime());
                } else {
                    retry(str);
                }
            } catch (Exception e) {
                this.log.debug("Attempt to acquire leadership lock for topic {} failed", str, e);
                retry(str);
            }
        }
    }

    private void notifyNewLeader(String str, NodeId nodeId, long j, long j2) {
        Leadership leadership = new Leadership(str, nodeId, j, j2);
        boolean z = false;
        synchronized (this.leaderBoard) {
            Leadership leadership2 = this.leaderBoard.get(str);
            if (leadership2 == null || leadership2.epoch() < j) {
                this.leaderBoard.put(str, leadership);
                z = true;
            }
        }
        if (z) {
            LeadershipEvent leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
            this.eventDispatcher.post(leadershipEvent);
            this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode(leadershipEvent)));
        }
    }

    private void notifyRemovedLeader(String str, NodeId nodeId, long j, long j2) {
        Leadership leadership = new Leadership(str, nodeId, j, j2);
        boolean z = false;
        synchronized (this.leaderBoard) {
            Leadership leadership2 = this.leaderBoard.get(str);
            if (leadership2 != null && leadership2.epoch() == leadership.epoch()) {
                this.leaderBoard.remove(str);
                z = true;
            }
        }
        if (z) {
            LeadershipEvent leadershipEvent = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership);
            this.eventDispatcher.post(leadershipEvent);
            this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode(leadershipEvent)));
        }
    }

    private void retry(String str) {
        this.retryLeaderLockExecutor.schedule(() -> {
            tryLeaderLock(str);
        }, 2L, TimeUnit.SECONDS);
    }

    private void purgeStaleLocks() {
        try {
            this.lockMap.entrySet().forEach(entry -> {
                String str = (String) entry.getKey();
                NodeId nodeId = (NodeId) ((Versioned) entry.getValue()).value();
                long version = ((Versioned) entry.getValue()).version();
                long creationTime = ((Versioned) entry.getValue()).creationTime();
                if (this.clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
                    this.log.info("Lock for {} is held by {} which is currently inactive", str, nodeId);
                    try {
                        if (this.lockMap.remove(str, version)) {
                            this.log.info("Purged stale lock held by {} for {}", nodeId, str);
                            notifyRemovedLeader(str, nodeId, version, creationTime);
                        }
                    } catch (Exception e) {
                        this.log.warn("Failed to purge stale lock held by {} for {}", new Object[]{nodeId, str, e});
                    }
                }
                if (!this.localNodeId.equals(nodeId) || this.activeTopics.contains(str)) {
                    return;
                }
                this.log.debug("Lock for {} is held by {} when it not running for leadership.", str, nodeId);
                try {
                    if (this.lockMap.remove(str, version)) {
                        this.log.info("Purged stale lock held by {} for {}", nodeId, str);
                        notifyRemovedLeader(str, nodeId, version, creationTime);
                    }
                } catch (Exception e2) {
                    this.log.warn("Failed to purge stale lock held by {} for {}", new Object[]{nodeId, str, e2});
                }
            });
        } catch (Exception e) {
            this.log.debug("Failed cleaning up stale locks", e);
        }
    }

    private void sendLeadershipStatus() {
        try {
            this.leaderBoard.forEach((str, leadership) -> {
                if (leadership.leader().equals(this.localNodeId)) {
                    this.clusterCommunicator.broadcast(new ClusterMessage(this.clusterService.getLocalNode().id(), LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER.encode(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership))));
                }
            });
        } catch (Exception e) {
            this.log.debug("Failed to send leadership updates", e);
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindEventDispatcher(EventDeliveryService eventDeliveryService) {
        this.eventDispatcher = eventDeliveryService;
    }

    protected void unbindEventDispatcher(EventDeliveryService eventDeliveryService) {
        if (this.eventDispatcher == eventDeliveryService) {
            this.eventDispatcher = null;
        }
    }
}
