package org.onosproject.store.statistic.impl;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/statistic/impl/DistributedStatisticStore.class */
public class DistributedStatisticStore implements StatisticStore {
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;
    protected static final KryoSerializer SERIALIZER = new KryoSerializer() { // from class: org.onosproject.store.statistic.impl.DistributedStatisticStore.1
        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private ExecutorService messageHandlingExecutor;
    private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private Map<ConnectPoint, InternalStatisticRepresentation> representations = new ConcurrentHashMap();
    private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap();
    private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/store/statistic/impl/DistributedStatisticStore$InternalStatisticRepresentation.class */
    public class InternalStatisticRepresentation {
        private final AtomicInteger counter;
        private final Set<FlowEntry> rules;

        private InternalStatisticRepresentation() {
            this.counter = new AtomicInteger(0);
            this.rules = new HashSet();
        }

        public void prepare() {
            this.counter.incrementAndGet();
        }

        public synchronized boolean remove(FlowRule flowRule) {
            this.rules.remove(flowRule);
            return this.counter.decrementAndGet() == 0;
        }

        public synchronized boolean submit(FlowEntry flowEntry) {
            if (this.rules.contains(flowEntry)) {
                this.rules.remove(flowEntry);
            }
            this.rules.add(flowEntry);
            return this.counter.get() == 0 || this.counter.decrementAndGet() == 0;
        }

        public synchronized Set<FlowEntry> get() {
            this.counter.set(this.rules.size());
            return Sets.newHashSet(this.rules);
        }
    }

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, Tools.groupedThreads("onos/store/statistic", "message-handlers"));
        this.clusterCommunicator.addSubscriber(StatisticStoreMessageSubjects.GET_CURRENT, new ClusterMessageHandler() { // from class: org.onosproject.store.statistic.impl.DistributedStatisticStore.2
            public void handle(ClusterMessage clusterMessage) {
                clusterMessage.respond(DistributedStatisticStore.SERIALIZER.encode(DistributedStatisticStore.this.getCurrentStatisticInternal((ConnectPoint) DistributedStatisticStore.SERIALIZER.decode(clusterMessage.payload()))));
            }
        }, this.messageHandlingExecutor);
        this.clusterCommunicator.addSubscriber(StatisticStoreMessageSubjects.GET_PREVIOUS, new ClusterMessageHandler() { // from class: org.onosproject.store.statistic.impl.DistributedStatisticStore.3
            public void handle(ClusterMessage clusterMessage) {
                clusterMessage.respond(DistributedStatisticStore.SERIALIZER.encode(DistributedStatisticStore.this.getPreviousStatisticInternal((ConnectPoint) DistributedStatisticStore.SERIALIZER.decode(clusterMessage.payload()))));
            }
        }, this.messageHandlingExecutor);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.clusterCommunicator.removeSubscriber(StatisticStoreMessageSubjects.GET_PREVIOUS);
        this.clusterCommunicator.removeSubscriber(StatisticStoreMessageSubjects.GET_CURRENT);
        this.messageHandlingExecutor.shutdown();
        this.log.info("Stopped");
    }

    public void prepareForStatistics(FlowRule flowRule) {
        InternalStatisticRepresentation orCreateRepresentation;
        ConnectPoint buildConnectPoint = buildConnectPoint(flowRule);
        if (buildConnectPoint == null) {
            return;
        }
        synchronized (this.representations) {
            orCreateRepresentation = getOrCreateRepresentation(buildConnectPoint);
        }
        orCreateRepresentation.prepare();
    }

    public synchronized void removeFromStatistics(FlowRule flowRule) {
        ConnectPoint buildConnectPoint = buildConnectPoint(flowRule);
        if (buildConnectPoint == null) {
            return;
        }
        InternalStatisticRepresentation internalStatisticRepresentation = this.representations.get(buildConnectPoint);
        if (internalStatisticRepresentation != null && internalStatisticRepresentation.remove(flowRule)) {
            updatePublishedStats(buildConnectPoint, Collections.emptySet());
        }
        Set<FlowEntry> set = this.current.get(buildConnectPoint);
        if (set != null) {
            set.remove(flowRule);
        }
        Set<FlowEntry> set2 = this.previous.get(buildConnectPoint);
        if (set2 != null) {
            set2.remove(flowRule);
        }
    }

    public void addOrUpdateStatistic(FlowEntry flowEntry) {
        InternalStatisticRepresentation internalStatisticRepresentation;
        ConnectPoint buildConnectPoint = buildConnectPoint(flowEntry);
        if (buildConnectPoint == null || (internalStatisticRepresentation = this.representations.get(buildConnectPoint)) == null || !internalStatisticRepresentation.submit(flowEntry)) {
            return;
        }
        updatePublishedStats(buildConnectPoint, internalStatisticRepresentation.get());
    }

    private synchronized void updatePublishedStats(ConnectPoint connectPoint, Set<FlowEntry> set) {
        Set<FlowEntry> set2 = this.current.get(connectPoint);
        if (set2 == null) {
            set2 = new HashSet();
        }
        this.previous.put(connectPoint, set2);
        this.current.put(connectPoint, set);
    }

    public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        ReplicaInfo replicaInfoFor = this.replicaInfoManager.getReplicaInfoFor(deviceId);
        if (!replicaInfoFor.master().isPresent()) {
            this.log.warn("No master for {}", deviceId);
            return Collections.emptySet();
        }
        if (((NodeId) replicaInfoFor.master().get()).equals(this.clusterService.getLocalNode().id())) {
            return getCurrentStatisticInternal(connectPoint);
        }
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        MessageSubject messageSubject = StatisticStoreMessageSubjects.GET_CURRENT;
        KryoSerializer kryoSerializer = SERIALIZER;
        kryoSerializer.getClass();
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        KryoSerializer kryoSerializer2 = SERIALIZER;
        kryoSerializer2.getClass();
        return (Set) Tools.futureGetOrElse(clusterCommunicationService.sendAndReceive(connectPoint, messageSubject, function, kryoSerializer2::decode, (NodeId) replicaInfoFor.master().get()), STATISTIC_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, Collections.emptySet());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
        return this.current.get(connectPoint);
    }

    public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
        DeviceId deviceId = connectPoint.deviceId();
        ReplicaInfo replicaInfoFor = this.replicaInfoManager.getReplicaInfoFor(deviceId);
        if (!replicaInfoFor.master().isPresent()) {
            this.log.warn("No master for {}", deviceId);
            return Collections.emptySet();
        }
        if (((NodeId) replicaInfoFor.master().get()).equals(this.clusterService.getLocalNode().id())) {
            return getPreviousStatisticInternal(connectPoint);
        }
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        MessageSubject messageSubject = StatisticStoreMessageSubjects.GET_PREVIOUS;
        KryoSerializer kryoSerializer = SERIALIZER;
        kryoSerializer.getClass();
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        KryoSerializer kryoSerializer2 = SERIALIZER;
        kryoSerializer2.getClass();
        return (Set) Tools.futureGetOrElse(clusterCommunicationService.sendAndReceive(connectPoint, messageSubject, function, kryoSerializer2::decode, (NodeId) replicaInfoFor.master().get()), STATISTIC_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, Collections.emptySet());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
        return this.previous.get(connectPoint);
    }

    private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint connectPoint) {
        if (this.representations.containsKey(connectPoint)) {
            return this.representations.get(connectPoint);
        }
        InternalStatisticRepresentation internalStatisticRepresentation = new InternalStatisticRepresentation();
        this.representations.put(connectPoint, internalStatisticRepresentation);
        return internalStatisticRepresentation;
    }

    private ConnectPoint buildConnectPoint(FlowRule flowRule) {
        PortNumber output = getOutput(flowRule);
        if (output == null) {
            return null;
        }
        return new ConnectPoint(flowRule.deviceId(), output);
    }

    private PortNumber getOutput(FlowRule flowRule) {
        for (Instructions.OutputInstruction outputInstruction : flowRule.treatment().allInstructions()) {
            if (outputInstruction.type() == Instruction.Type.OUTPUT) {
                return outputInstruction.port();
            }
            if (outputInstruction.type() == Instruction.Type.DROP) {
                return PortNumber.P0;
            }
        }
        return null;
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }
}
