package org.onosproject.store.flowext.impl;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flowext.DefaultFlowRuleExt;
import org.onosproject.net.flowext.DownStreamFlowEntry;
import org.onosproject.net.flowext.FlowExtCompletedOperation;
import org.onosproject.net.flowext.FlowRuleExtRouter;
import org.onosproject.net.flowext.FlowRuleExtRouterListener;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true, enabled = false)
/* loaded from: input_file:org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.class */
public class DefaultFlowRuleExtRouter implements FlowRuleExtRouter {
    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ReplicaInfoService replicaInfoManager;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterCommunicationService clusterCommunicator;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;
    private ExecutorService messageHandlingExecutor;
    protected static final StoreSerializer SERIALIZER = new KryoSerializer() { // from class: org.onosproject.store.flowext.impl.DefaultFlowRuleExtRouter.1
        protected void setupKryoPool() {
            this.serializerPool = KryoNamespace.newBuilder().register(DistributedStoreSerializers.STORE_COMMON).nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN).register(new Class[]{FlowExtCompletedOperation.class}).register(new Class[]{FlowRuleBatchRequest.class}).register(new Class[]{DownStreamFlowEntry.class}).register(new Class[]{DefaultFlowRuleExt.class}).build();
        }
    };
    private ReplicaInfoEventListener replicaInfoEventListener;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private int pendingFutureTimeoutMinutes = 5;
    protected Set<FlowRuleExtRouterListener> routerListener = new HashSet();
    private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder.newBuilder().expireAfterWrite(this.pendingFutureTimeoutMinutes, TimeUnit.MINUTES).build();
    private final ExecutorService futureListeners = Executors.newCachedThreadPool(Tools.groupedThreads("onos/flow", "store-peer-responders"));

    @Activate
    public void activate() {
        this.messageHandlingExecutor = Executors.newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, Tools.groupedThreads("onos/flow", "message-handlers"));
        this.clusterCommunicator.addSubscriber(FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS, new ClusterMessageHandler() { // from class: org.onosproject.store.flowext.impl.DefaultFlowRuleExtRouter.2
            public void handle(final ClusterMessage clusterMessage) {
                FlowRuleBatchRequest flowRuleBatchRequest = (FlowRuleBatchRequest) DefaultFlowRuleExtRouter.SERIALIZER.decode(clusterMessage.payload());
                DefaultFlowRuleExtRouter.this.log.info("received batch request {}", flowRuleBatchRequest);
                final ListenableFuture applyBatchInternal = DefaultFlowRuleExtRouter.this.applyBatchInternal(flowRuleBatchRequest);
                applyBatchInternal.addListener(new Runnable() { // from class: org.onosproject.store.flowext.impl.DefaultFlowRuleExtRouter.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            clusterMessage.respond(DefaultFlowRuleExtRouter.SERIALIZER.encode((FlowExtCompletedOperation) Futures.getUnchecked(applyBatchInternal)));
                        } catch (IOException e) {
                            DefaultFlowRuleExtRouter.this.log.error("Failed to respond back", e);
                        }
                    }
                }, DefaultFlowRuleExtRouter.this.futureListeners);
            }
        }, this.messageHandlingExecutor);
        this.replicaInfoManager.addListener(this.replicaInfoEventListener);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.clusterCommunicator.removeSubscriber(FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS);
        this.messageHandlingExecutor.shutdown();
        this.replicaInfoManager.removeListener(this.replicaInfoEventListener);
        this.log.info("Stopped");
    }

    public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest flowRuleBatchRequest) {
        if (flowRuleBatchRequest.ops().isEmpty()) {
            return Futures.immediateFuture(new FlowExtCompletedOperation(flowRuleBatchRequest.batchId(), true, Collections.emptySet()));
        }
        DeviceId batchDeviceId = getBatchDeviceId(flowRuleBatchRequest.ops());
        if (batchDeviceId == null) {
            this.log.error("This Batch exists more than two deviceId");
            return null;
        }
        ReplicaInfo replicaInfoFor = this.replicaInfoManager.getReplicaInfoFor(batchDeviceId);
        if (((NodeId) replicaInfoFor.master().get()).equals(this.clusterService.getLocalNode().id())) {
            return applyBatchInternal(flowRuleBatchRequest);
        }
        this.log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", replicaInfoFor.master().orNull(), batchDeviceId);
        try {
            return Futures.transform(this.clusterCommunicator.sendAndReceive(new ClusterMessage(this.clusterService.getLocalNode().id(), FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS, SERIALIZER.encode(flowRuleBatchRequest)), (NodeId) replicaInfoFor.master().get()), new DecodeTo(SERIALIZER));
        } catch (IOException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest flowRuleBatchRequest) {
        SettableFuture create = SettableFuture.create();
        this.pendingExtendFutures.put(Long.valueOf(flowRuleBatchRequest.batchId()), create);
        notify(flowRuleBatchRequest);
        return create;
    }

    private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> collection) {
        FlowRuleBatchEntry next = collection.iterator().next();
        boolean z = true;
        Iterator<FlowRuleBatchEntry> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (((FlowRule) it.next().target()).deviceId() != ((FlowRule) next.target()).deviceId()) {
                this.log.warn("this batch does not apply on one device Id ");
                z = false;
                break;
            }
        }
        if (z) {
            return ((FlowRule) next.target()).deviceId();
        }
        return null;
    }

    public void notify(FlowRuleBatchRequest flowRuleBatchRequest) {
        Iterator<FlowRuleExtRouterListener> it = this.routerListener.iterator();
        while (it.hasNext()) {
            it.next().notify(FlowRuleBatchEvent.requested(flowRuleBatchRequest, (DeviceId) null));
        }
    }

    public void batchOperationComplete(FlowRuleBatchEvent flowRuleBatchEvent) {
        Long valueOf = Long.valueOf(((FlowRuleBatchRequest) flowRuleBatchEvent.subject()).batchId());
        SettableFuture settableFuture = (SettableFuture) this.pendingExtendFutures.getIfPresent(valueOf);
        if (settableFuture != null) {
            FlowRuleBatchRequest flowRuleBatchRequest = (FlowRuleBatchRequest) flowRuleBatchEvent.subject();
            CompletedBatchOperation result = flowRuleBatchEvent.result();
            settableFuture.set(new FlowExtCompletedOperation(flowRuleBatchRequest.batchId(), result.isSuccess(), result.failedItems()));
            this.pendingExtendFutures.invalidate(valueOf);
        }
    }

    public void addListener(FlowRuleExtRouterListener flowRuleExtRouterListener) {
        this.routerListener.add(flowRuleExtRouterListener);
    }

    public void removeListener(FlowRuleExtRouterListener flowRuleExtRouterListener) {
        this.routerListener.remove(flowRuleExtRouterListener);
    }

    protected void bindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        this.replicaInfoManager = replicaInfoService;
    }

    protected void unbindReplicaInfoManager(ReplicaInfoService replicaInfoService) {
        if (this.replicaInfoManager == replicaInfoService) {
            this.replicaInfoManager = null;
        }
    }

    protected void bindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        this.clusterCommunicator = clusterCommunicationService;
    }

    protected void unbindClusterCommunicator(ClusterCommunicationService clusterCommunicationService) {
        if (this.clusterCommunicator == clusterCommunicationService) {
            this.clusterCommunicator = null;
        }
    }

    protected void bindClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
    }

    protected void unbindClusterService(ClusterService clusterService) {
        if (this.clusterService == clusterService) {
            this.clusterService = null;
        }
    }

    protected void bindDeviceService(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    protected void unbindDeviceService(DeviceService deviceService) {
        if (this.deviceService == deviceService) {
            this.deviceService = null;
        }
    }
}
