package org.onosproject.store.packet.impl;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/store/packet/impl/DistributedPacketStore.class */
public class DistributedPacketStore extends AbstractStore<PacketEvent, PacketStoreDelegate> implements PacketStore {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected MastershipService mastershipService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService communicationService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;
    private PacketRequestTracker tracker;
    private static final MessageSubject PACKET_OUT_SUBJECT = new MessageSubject("packet-out");
    private static final KryoSerializer SERIALIZER = new KryoSerializer() { // from class: org.onosproject.store.packet.impl.DistributedPacketStore.1
        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(KryoNamespaces.API).nextId(300).build();
        }
    };
    private ExecutorService messageHandlingExecutor;

    /* loaded from: input_file:org/onosproject/store/packet/impl/DistributedPacketStore$InternalClusterMessageHandler.class */
    private class InternalClusterMessageHandler implements ClusterMessageHandler {
        private InternalClusterMessageHandler() {
        }

        public void handle(ClusterMessage clusterMessage) {
            if (!clusterMessage.subject().equals(DistributedPacketStore.PACKET_OUT_SUBJECT)) {
                DistributedPacketStore.this.log.warn("Received message with wrong subject: {}", clusterMessage);
            }
            DistributedPacketStore.this.notifyDelegate(new PacketEvent(PacketEvent.Type.EMIT, (OutboundPacket) DistributedPacketStore.SERIALIZER.decode(clusterMessage.payload())));
        }
    }

    /* loaded from: input_file:org/onosproject/store/packet/impl/DistributedPacketStore$PacketRequestTracker.class */
    private class PacketRequestTracker {
        private ConsistentMap<PacketRequest, Boolean> requests;

        public PacketRequestTracker() {
            this.requests = DistributedPacketStore.this.storageService.consistentMapBuilder().withName("packet-requests").withPartitionsDisabled().withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build())).withSerializer(new Serializer() { // from class: org.onosproject.store.packet.impl.DistributedPacketStore.PacketRequestTracker.1
                KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();

                public <T> byte[] encode(T t) {
                    return this.kryo.serialize(t);
                }

                public <T> T decode(byte[] bArr) {
                    return (T) this.kryo.deserialize(bArr);
                }
            }).build();
        }

        public boolean add(PacketRequest packetRequest) {
            return this.requests.putIfAbsent(packetRequest, true) == null;
        }

        public boolean remove(PacketRequest packetRequest) {
            return this.requests.remove(packetRequest) != null;
        }

        public Set<PacketRequest> requests() {
            return this.requests.keySet();
        }
    }

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, Tools.groupedThreads("onos/store/packet", "message-handlers"));
        this.communicationService.addSubscriber(PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), this.messageHandlingExecutor);
        this.tracker = new PacketRequestTracker();
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
        this.messageHandlingExecutor.shutdown();
        this.log.info("Stopped");
    }

    public void emit(OutboundPacket outboundPacket) {
        NodeId id = this.clusterService.getLocalNode().id();
        NodeId masterFor = this.mastershipService.getMasterFor(outboundPacket.sendThrough());
        if (masterFor == null) {
            return;
        }
        if (id.equals(masterFor)) {
            notifyDelegate(new PacketEvent(PacketEvent.Type.EMIT, outboundPacket));
            return;
        }
        ClusterCommunicationService clusterCommunicationService = this.communicationService;
        MessageSubject messageSubject = PACKET_OUT_SUBJECT;
        KryoSerializer kryoSerializer = SERIALIZER;
        kryoSerializer.getClass();
        clusterCommunicationService.unicast(outboundPacket, messageSubject, (v1) -> {
            return r3.encode(v1);
        }, masterFor);
    }

    public boolean requestPackets(PacketRequest packetRequest) {
        return this.tracker.add(packetRequest);
    }

    public Set<PacketRequest> existingRequests() {
        return this.tracker.requests();
    }

    protected void bindMastershipService(MastershipService mastershipService) {
        this.mastershipService = mastershipService;
    }

    protected void unbindMastershipService(MastershipService mastershipService) {
        if (this.mastershipService == mastershipService) {
            this.mastershipService = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        this.communicationService = clusterCommunicationService;
    }

    protected void unbindCommunicationService(ClusterCommunicationService clusterCommunicationService) {
        if (this.communicationService == clusterCommunicationService) {
            this.communicationService = null;
        }
    }

    protected void bindStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    protected void unbindStorageService(StorageService storageService) {
        if (this.storageService == storageService) {
            this.storageService = null;
        }
    }
}
