package org.onosproject.store.consistent.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true, enabled = true)
/* loaded from: input_file:org/onosproject/store/consistent/impl/DistributedLeadershipManager.class */
public class DistributedLeadershipManager implements LeadershipService {

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected EventDeliveryService eventDispatcher;
    private ScheduledExecutorService electionRunner;
    private ScheduledExecutorService lockExecutor;
    private ScheduledExecutorService staleLeadershipPurgeExecutor;
    private ScheduledExecutorService leadershipRefresher;
    private ConsistentMap<String, NodeId> leaderMap;
    private ConsistentMap<String, List<NodeId>> candidateMap;
    private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
    private NodeId localNodeId;
    private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
    private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
    private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
    private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
    private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
    private Set<String> activeTopics = Sets.newConcurrentHashSet();
    private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
    private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);

    /* loaded from: input_file:org/onosproject/store/consistent/impl/DistributedLeadershipManager$InternalClusterEventListener.class */
    private class InternalClusterEventListener implements ClusterEventListener {
        private InternalClusterEventListener() {
        }

        public void event(ClusterEvent clusterEvent) {
            if (clusterEvent.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || clusterEvent.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
                DistributedLeadershipManager.this.scheduleStaleLeadershipPurge(0);
            }
        }
    }

    @Activate
    public void activate() {
        this.leaderMap = this.storageService.consistentMapBuilder().withName("onos-topic-leaders").withSerializer(SERIALIZER).withPartitionsDisabled().build();
        this.candidateMap = this.storageService.consistentMapBuilder().withName("onos-topic-candidates").withSerializer(SERIALIZER).withPartitionsDisabled().build();
        this.leaderMap.addListener(mapEvent -> {
            this.log.debug("Received {}", mapEvent);
            LeadershipEvent.Type type = null;
            if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
                type = LeadershipEvent.Type.LEADER_ELECTED;
            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
                type = LeadershipEvent.Type.LEADER_BOOTED;
            }
            onLeadershipEvent(new LeadershipEvent(type, new Leadership((String) mapEvent.key(), (NodeId) mapEvent.value().value(), mapEvent.value().version(), mapEvent.value().creationTime())));
        });
        this.candidateMap.addListener(mapEvent2 -> {
            this.log.debug("Received {}", mapEvent2);
            if (mapEvent2.type() == MapEvent.Type.INSERT || mapEvent2.type() == MapEvent.Type.UPDATE) {
                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, new Leadership((String) mapEvent2.key(), (List) mapEvent2.value().value(), mapEvent2.value().version(), mapEvent2.value().creationTime())));
            } else {
                this.log.error("Entries must not be removed from candidate map");
            }
        });
        this.localNodeId = this.clusterService.getLocalNode().id();
        this.electionRunner = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/store/leadership", "election-runner"));
        this.lockExecutor = Executors.newScheduledThreadPool(4, Tools.groupedThreads("onos/store/leadership", "election-thread-%d"));
        this.staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
        this.leadershipRefresher = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/store/leadership", "refresh-thread"));
        this.clusterService.addListener(this.clusterEventListener);
        this.electionRunner.scheduleWithFixedDelay(this::electLeaders, 0L, 2L, TimeUnit.SECONDS);
        this.leadershipRefresher.scheduleWithFixedDelay(this::refreshLeaderBoard, 0L, 2L, TimeUnit.SECONDS);
        this.listenerRegistry = new ListenerRegistry<>();
        this.eventDispatcher.addSink(LeadershipEvent.class, this.listenerRegistry);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        if (this.clusterService.getNodes().size() > 1) {
            this.leaderBoard.forEach((str, leadership) -> {
                if (this.localNodeId.equals(leadership.leader())) {
                    withdraw(str);
                }
            });
        }
        this.clusterService.removeListener(this.clusterEventListener);
        this.eventDispatcher.removeSink(LeadershipEvent.class);
        this.electionRunner.shutdown();
        this.lockExecutor.shutdown();
        this.staleLeadershipPurgeExecutor.shutdown();
        this.leadershipRefresher.shutdown();
        this.log.info("Stopped");
    }

    public Map<String, Leadership> getLeaderBoard() {
        return ImmutableMap.copyOf(this.leaderBoard);
    }

    public Map<String, List<NodeId>> getCandidates() {
        return Maps.toMap(this.candidateBoard.keySet(), this::getCandidates);
    }

    public List<NodeId> getCandidates(String str) {
        Leadership leadership = this.candidateBoard.get(str);
        return leadership == null ? ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
    }

    public NodeId getLeader(String str) {
        Leadership leadership = this.leaderBoard.get(str);
        if (leadership != null) {
            return leadership.leader();
        }
        return null;
    }

    public Leadership getLeadership(String str) {
        Preconditions.checkArgument(str != null);
        return this.leaderBoard.get(str);
    }

    public Set<String> ownedTopics(NodeId nodeId) {
        Preconditions.checkArgument(nodeId != null);
        return (Set) this.leaderBoard.entrySet().stream().filter(entry -> {
            return nodeId.equals(((Leadership) entry.getValue()).leader());
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
    }

    public CompletableFuture<Leadership> runForLeadership(String str) {
        this.log.debug("Running for leadership for topic: {}", str);
        CompletableFuture<Leadership> completableFuture = new CompletableFuture<>();
        doRunForLeadership(str, completableFuture);
        return completableFuture;
    }

    private void doRunForLeadership(String str, CompletableFuture<Leadership> completableFuture) {
        try {
            Versioned computeIf = this.candidateMap.computeIf(str, list -> {
                return list == null || !list.contains(this.localNodeId);
            }, (str2, list2) -> {
                if (list2 == null) {
                    return ImmutableList.of(this.localNodeId);
                }
                LinkedList newLinkedList = Lists.newLinkedList();
                newLinkedList.addAll(list2);
                newLinkedList.add(this.localNodeId);
                return newLinkedList;
            });
            this.log.debug("In the leadership race for topic {} with candidates {}", str, computeIf);
            this.activeTopics.add(str);
            Leadership electLeader = electLeader(str, (List) computeIf.value());
            if (electLeader == null) {
                this.pendingFutures.put(str, completableFuture);
            } else {
                completableFuture.complete(electLeader);
            }
        } catch (ConsistentMapException e) {
            this.log.debug("Failed to enter topic leader race for {}. Retrying.", str, e);
            rerunForLeadership(str, completableFuture);
        }
    }

    public CompletableFuture<Void> withdraw(String str) {
        this.activeTopics.remove(str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        doWithdraw(str, completableFuture);
        return completableFuture;
    }

    private void doWithdraw(String str, CompletableFuture<Void> completableFuture) {
        if (this.activeTopics.contains(str)) {
            completableFuture.completeExceptionally(new CancellationException(String.format("%s is now a active topic", str)));
        }
        try {
            ConsistentMap<String, NodeId> consistentMap = this.leaderMap;
            NodeId nodeId = this.localNodeId;
            nodeId.getClass();
            consistentMap.computeIf(str, (v1) -> {
                return r2.equals(v1);
            }, (str2, nodeId2) -> {
                return null;
            });
            this.candidateMap.computeIf(str, list -> {
                return list != null && list.contains(this.localNodeId);
            }, (str3, list2) -> {
                return (List) list2.stream().filter(nodeId3 -> {
                    return !this.localNodeId.equals(nodeId3);
                }).collect(Collectors.toList());
            });
            completableFuture.complete(null);
        } catch (Exception e) {
            this.log.debug("Failed to verify (and clear) any lock this node might be holding for {}", str, e);
            retryWithdraw(str, completableFuture);
        }
    }

    public boolean stepdown(String str) {
        if (!this.activeTopics.contains(str) || !Objects.equals(this.localNodeId, getLeader(str))) {
            return false;
        }
        try {
            ConsistentMap<String, NodeId> consistentMap = this.leaderMap;
            NodeId nodeId = this.localNodeId;
            nodeId.getClass();
            return consistentMap.computeIf(str, (v1) -> {
                return r2.equals(v1);
            }, (str2, nodeId2) -> {
                return null;
            }) == null;
        } catch (Exception e) {
            this.log.warn("Error executing stepdown for {}", str, e);
            return false;
        }
    }

    public void addListener(LeadershipEventListener leadershipEventListener) {
        this.listenerRegistry.addListener(leadershipEventListener);
    }

    public void removeListener(LeadershipEventListener leadershipEventListener) {
        this.listenerRegistry.removeListener(leadershipEventListener);
    }

    public boolean makeTopCandidate(String str, NodeId nodeId) {
        Versioned computeIf = this.candidateMap.computeIf(str, list -> {
            return (list == null || !list.contains(nodeId) || nodeId.equals(Iterables.getFirst(list, (Object) null))) ? false : true;
        }, (str2, list2) -> {
            ArrayList arrayList = new ArrayList(list2.size());
            arrayList.add(nodeId);
            Stream filter = list2.stream().filter(nodeId2 -> {
                return !nodeId.equals(nodeId2);
            });
            arrayList.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
            return arrayList;
        });
        List emptyList = computeIf != null ? (List) computeIf.value() : Collections.emptyList();
        return emptyList.size() > 0 && nodeId.equals(emptyList.get(0));
    }

    private Leadership electLeader(String str, List<NodeId> list) {
        Leadership leadership = getLeadership(str);
        if (leadership != null) {
            return leadership;
        }
        try {
            Versioned computeIfAbsent = this.localNodeId.equals(list.stream().filter(nodeId -> {
                return this.clusterService.getState(nodeId) == ControllerNode.State.ACTIVE;
            }).findFirst().orElse(null)) ? this.leaderMap.computeIfAbsent(str, str2 -> {
                return this.localNodeId;
            }) : this.leaderMap.get(str);
            if (computeIfAbsent == null) {
                return null;
            }
            Leadership leadership2 = new Leadership(str, (NodeId) computeIfAbsent.value(), computeIfAbsent.version(), computeIfAbsent.creationTime());
            onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership2));
            return leadership2;
        } catch (Exception e) {
            this.log.debug("Failed to elect leader for {}", str, e);
            return null;
        }
    }

    private void electLeaders() {
        try {
            this.candidateMap.entrySet().forEach(entry -> {
                String str = (String) entry.getKey();
                Versioned versioned = (Versioned) entry.getValue();
                if (this.activeTopics.contains(str)) {
                    this.lockExecutor.submit(() -> {
                        CompletableFuture<Leadership> remove;
                        Leadership electLeader = electLeader(str, (List) versioned.value());
                        if (electLeader == null || (remove = this.pendingFutures.remove(str)) == null) {
                            return;
                        }
                        remove.complete(electLeader);
                    });
                }
                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, new Leadership(str, (List) versioned.value(), versioned.version(), versioned.creationTime())));
            });
        } catch (Exception e) {
            this.log.debug("Failure electing leaders", e);
        }
    }

    private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
        this.log.trace("Leadership Event: time = {} type = {} event = {}", new Object[]{Long.valueOf(leadershipEvent.time()), leadershipEvent.type(), leadershipEvent});
        Leadership leadership = (Leadership) leadershipEvent.subject();
        LeadershipEvent.Type type = leadershipEvent.type();
        String str = leadership.topic();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (type.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
            this.leaderBoard.compute(str, (str2, leadership2) -> {
                if (leadership2 != null && leadership2.epoch() >= leadership.epoch()) {
                    return leadership2;
                }
                atomicBoolean.set(true);
                return leadership;
            });
        } else if (type.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
            this.leaderBoard.compute(str, (str3, leadership3) -> {
                if (leadership3 != null && leadership3.epoch() > leadership.epoch()) {
                    return leadership3;
                }
                atomicBoolean.set(true);
                return null;
            });
        } else {
            if (!type.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
                throw new IllegalStateException("Unknown event type.");
            }
            this.candidateBoard.compute(str, (str4, leadership4) -> {
                if (leadership4 != null && leadership4.epoch() >= leadership.epoch()) {
                    return leadership4;
                }
                atomicBoolean.set(true);
                return leadership;
            });
        }
        if (atomicBoolean.get()) {
            this.eventDispatcher.post(leadershipEvent);
        }
    }

    private void rerunForLeadership(String str, CompletableFuture<Leadership> completableFuture) {
        this.lockExecutor.schedule(() -> {
            doRunForLeadership(str, completableFuture);
        }, RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), TimeUnit.MILLISECONDS);
    }

    private void retryWithdraw(String str, CompletableFuture<Void> completableFuture) {
        this.lockExecutor.schedule(() -> {
            doWithdraw(str, completableFuture);
        }, RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleStaleLeadershipPurge(int i) {
        if (this.staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
            this.staleLeadershipPurgeExecutor.schedule(this::purgeStaleLeadership, i, TimeUnit.SECONDS);
        }
    }

    private void purgeStaleLeadership() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            this.staleLeadershipPurgeScheduled.set(false);
            this.leaderMap.entrySet().stream().filter(entry -> {
                return this.clusterService.getState((NodeId) ((Versioned) entry.getValue()).value()) == ControllerNode.State.INACTIVE;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                NodeId nodeId = (NodeId) ((Versioned) entry2.getValue()).value();
                try {
                    ConsistentMap<String, NodeId> consistentMap = this.leaderMap;
                    nodeId.getClass();
                    consistentMap.computeIf(str, (v1) -> {
                        return r2.equals(v1);
                    }, (str2, nodeId2) -> {
                        return null;
                    });
                } catch (Exception e) {
                    this.log.debug("Failed to purge stale lock held by {} for {}", new Object[]{nodeId, str, e});
                    atomicBoolean.set(true);
                }
            });
            this.candidateMap.entrySet().forEach(entry3 -> {
                String str = (String) entry3.getKey();
                Versioned versioned = (Versioned) entry3.getValue();
                List emptyList = versioned != null ? (List) versioned.value() : Collections.emptyList();
                List list = (List) emptyList.stream().filter(nodeId -> {
                    return this.clusterService.getState(nodeId) == ControllerNode.State.ACTIVE;
                }).filter(nodeId2 -> {
                    return !this.localNodeId.equals(nodeId2) || this.activeTopics.contains(str);
                }).collect(Collectors.toList());
                if (list.size() < emptyList.size()) {
                    Sets.SetView difference = Sets.difference(Sets.newHashSet(emptyList), Sets.newHashSet(list));
                    try {
                        this.candidateMap.computeIf(str, list2 -> {
                            return list2.stream().filter(nodeId3 -> {
                                return this.clusterService.getState(nodeId3) == ControllerNode.State.INACTIVE;
                            }).count() > 0;
                        }, (str2, list3) -> {
                            return (List) list3.stream().filter(nodeId3 -> {
                                return this.clusterService.getState(nodeId3) == ControllerNode.State.ACTIVE;
                            }).filter(nodeId4 -> {
                                return !this.localNodeId.equals(nodeId4) || this.activeTopics.contains(str);
                            }).collect(Collectors.toList());
                        });
                    } catch (Exception e) {
                        this.log.debug("Failed to evict inactive candidates {} from candidate list for {}", new Object[]{difference, str, e});
                        atomicBoolean.set(true);
                    }
                }
            });
        } catch (Exception e) {
            this.log.debug("Failure purging state leadership.", e);
            atomicBoolean.set(true);
        }
        if (atomicBoolean.get()) {
            this.log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
            scheduleStaleLeadershipPurge(2);
        }
    }

    private void refreshLeaderBoard() {
        try {
            HashMap newHashMap = Maps.newHashMap();
            this.leaderMap.entrySet().forEach(entry -> {
                String str = (String) entry.getKey();
                Versioned versioned = (Versioned) entry.getValue();
                newHashMap.put(str, new Leadership(str, (NodeId) versioned.value(), versioned.version(), versioned.creationTime()));
            });
            MapDifference difference = Maps.difference(ImmutableMap.copyOf(this.leaderBoard), newHashMap);
            difference.entriesOnlyOnLeft().forEach((str, leadership) -> {
                this.log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
            });
            difference.entriesOnlyOnRight().forEach((str2, leadership2) -> {
                this.log.debug("Adding {} to leaderboard. It is now the active leader.", leadership2);
                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership2));
            });
            difference.entriesDiffering().forEach((str3, valueDifference) -> {
                Leadership leadership3 = (Leadership) valueDifference.leftValue();
                Leadership leadership4 = (Leadership) valueDifference.rightValue();
                if (leadership3.epoch() < leadership4.epoch()) {
                    this.log.debug("Updated {} in leaderboard.", leadership4);
                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership4));
                }
            });
        } catch (Exception e) {
            this.log.debug("Failed to refresh leader board", e);
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindEventDispatcher(EventDeliveryService eventDeliveryService) {
        this.eventDispatcher = eventDeliveryService;
    }

    protected void unbindEventDispatcher(EventDeliveryService eventDeliveryService) {
        if (this.eventDispatcher == eventDeliveryService) {
            this.eventDispatcher = null;
        }
    }
}
