package org.onosproject.store.packet.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/packet/impl/DistributedPacketStore.class */
public class DistributedPacketStore extends AbstractStore<PacketEvent, PacketStoreDelegate> implements PacketStore {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ComponentConfigService cfgService;
    private PacketRequestTracker tracker;
    private ExecutorService messageHandlingExecutor;
    private static final int MAX_BACKOFF = 50;
    private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out");
    private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
    private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;

    @Property(name = "messageHandlerThreadPoolSize", intValue = {DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE}, label = "Size of thread pool to assign message handler")
    private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;

    /* loaded from: input_file:org/onosproject/store/packet/impl/DistributedPacketStore$PacketRequestTracker.class */
    private final class PacketRequestTracker {
        private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;

        private PacketRequestTracker() {
            this.requests = DistributedPacketStore.this.storageService.consistentMapBuilder().withName("onos-packet-requests").withSerializer(Serializer.using(KryoNamespaces.API)).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(PacketRequest packetRequest) {
            if (!addInternal(packetRequest).get() || DistributedPacketStore.this.delegate == null) {
                return;
            }
            DistributedPacketStore.this.delegate.requestPackets(packetRequest);
        }

        private AtomicBoolean addInternal(PacketRequest packetRequest) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.requests.compute(packetRequest.selector(), (trafficSelector, set) -> {
                atomicBoolean.set(false);
                if (set == null) {
                    atomicBoolean.set(true);
                    return ImmutableSet.of(packetRequest);
                }
                if (set.contains(packetRequest)) {
                    return set;
                }
                atomicBoolean.set(true);
                return ImmutableSet.builder().addAll(set).add(packetRequest).build();
            });
            return atomicBoolean;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void remove(PacketRequest packetRequest) {
            if (!removeInternal(packetRequest).get() || DistributedPacketStore.this.delegate == null) {
                return;
            }
            DistributedPacketStore.this.delegate.cancelPackets(packetRequest);
        }

        private AtomicBoolean removeInternal(PacketRequest packetRequest) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.requests.computeIfPresent(packetRequest.selector(), (trafficSelector, set) -> {
                atomicBoolean.set(false);
                if (!set.contains(packetRequest)) {
                    return set;
                }
                HashSet newHashSet = Sets.newHashSet(set);
                newHashSet.remove(packetRequest);
                if (newHashSet.size() > 0) {
                    return ImmutableSet.copyOf(newHashSet);
                }
                atomicBoolean.set(true);
                return null;
            });
            return atomicBoolean;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<PacketRequest> requests() {
            ArrayList newArrayList = Lists.newArrayList();
            this.requests.values().forEach(versioned -> {
                newArrayList.addAll((Collection) versioned.value());
            });
            newArrayList.sort((packetRequest, packetRequest2) -> {
                return packetRequest.priority().priorityValue() - packetRequest2.priority().priorityValue();
            });
            return newArrayList;
        }
    }

    @Activate
    public void activate(ComponentContext componentContext) {
        this.cfgService.registerProperties(getClass());
        modified(componentContext);
        this.messageHandlingExecutor = Executors.newFixedThreadPool(messageHandlerThreadPoolSize, Tools.groupedThreads("onos/store/packet", "message-handlers", this.log));
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        MessageSubject messageSubject = PACKET_OUT_SUBJECT;
        StoreSerializer storeSerializer = SERIALIZER;
        storeSerializer.getClass();
        clusterCommunicationService.addSubscriber(messageSubject, storeSerializer::decode, outboundPacket -> {
            notifyDelegate(new PacketEvent(PacketEvent.Type.EMIT, outboundPacket));
        }, this.messageHandlingExecutor);
        this.tracker = new PacketRequestTracker();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.cfgService.unregisterProperties(getClass(), false);
        this.communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.tracker = null;
        this.log.info("Stopped");
    }

    @Modified
    public void modified(ComponentContext componentContext) {
        int i;
        try {
            String str = Tools.get(componentContext != null ? componentContext.getProperties() : new Properties(), "messageHandlerThreadPoolSize");
            i = Strings.isNullOrEmpty(str) ? messageHandlerThreadPoolSize : Integer.parseInt(str.trim());
        } catch (NumberFormatException e) {
            this.log.warn(e.getMessage());
            i = messageHandlerThreadPoolSize;
        }
        if (i != messageHandlerThreadPoolSize) {
            setMessageHandlerThreadPoolSize(i);
            restartMessageHandlerThreadPool();
        }
        this.log.info(FORMAT, Integer.valueOf(messageHandlerThreadPoolSize));
    }

    public void emit(OutboundPacket outboundPacket) {
        NodeId id = this.clusterService.getLocalNode().id();
        NodeId masterFor = this.mastershipService.getMasterFor(outboundPacket.sendThrough());
        if (masterFor == null) {
            return;
        }
        if (id.equals(masterFor)) {
            notifyDelegate(new PacketEvent(PacketEvent.Type.EMIT, outboundPacket));
            return;
        }
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        MessageSubject messageSubject = PACKET_OUT_SUBJECT;
        StoreSerializer storeSerializer = SERIALIZER;
        storeSerializer.getClass();
        clusterCommunicationService.unicast(outboundPacket, messageSubject, (v1) -> {
            return r3.encode(v1);
        }, masterFor).whenComplete((r7, th) -> {
            if (th != null) {
                this.log.warn("Failed to send packet-out to {}", masterFor, th);
            }
        });
    }

    public void requestPackets(PacketRequest packetRequest) {
        this.tracker.add(packetRequest);
    }

    public void cancelPackets(PacketRequest packetRequest) {
        this.tracker.remove(packetRequest);
    }

    public List<PacketRequest> existingRequests() {
        return this.tracker.requests();
    }

    private void setMessageHandlerThreadPoolSize(int i) {
        Preconditions.checkArgument(i >= 0, "Message handler pool size must be 0 or more");
        messageHandlerThreadPoolSize = i;
    }

    private void restartMessageHandlerThreadPool() {
        ExecutorService executorService = this.messageHandlingExecutor;
        this.messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize(), Tools.groupedThreads("DistPktStore", "messageHandling-%d", this.log));
        executorService.shutdown();
    }

    private int getMessageHandlerThreadPoolSize() {
        return messageHandlerThreadPoolSize;
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        this.communicationService = clusterCommunicationService;
    }

    protected void unbindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        if (this.communicationService == clusterCommunicationService) {
            this.communicationService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }

    protected void bindCfgService(ComponentConfigService componentConfigService) {
        this.cfgService = componentConfigService;
    }

    protected void unbindCfgService(ComponentConfigService componentConfigService) {
        if (this.cfgService == componentConfigService) {
            this.cfgService = null;
        }
    }
}
