package org.onosproject.store.flow.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.OsgiPropertyConstants;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.flow.impl.LifecycleEvent;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onosproject/store/flow/impl/DeviceFlowTable.class */
public class DeviceFlowTable {
    private static final int NUM_BUCKETS = 1024;
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder().register(KryoNamespaces.API).register(new Class[]{BucketId.class}).register(new Class[]{FlowBucket.class}).register(new Class[]{FlowBucketDigest.class}).register(new Class[]{LogicalTimestamp.class}).register(new Class[]{Timestamped.class}).build());
    private final MessageSubject getDigestsSubject;
    private final MessageSubject getBucketSubject;
    private final MessageSubject backupSubject;
    private final DeviceId deviceId;
    private final ClusterCommunicationService clusterCommunicator;
    private final LifecycleManager lifecycleManager;
    private final ScheduledExecutorService executorService;
    private final NodeId localNodeId;
    private volatile DeviceReplicaInfo replicaInfo;
    private volatile long activeTerm;
    private ScheduledFuture<?> backupFuture;
    private ScheduledFuture<?> antiEntropyFuture;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final LogicalClock clock = new LogicalClock();
    private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() { // from class: org.onosproject.store.flow.impl.DeviceFlowTable.1
        public void event(LifecycleEvent lifecycleEvent) {
            DeviceFlowTable.this.executorService.execute(() -> {
                DeviceFlowTable.this.onLifecycleEvent(lifecycleEvent);
            });
        }
    };
    private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
    private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
    private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
    private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.onosproject.store.flow.impl.DeviceFlowTable$2, reason: invalid class name */
    /* loaded from: input_file:org/onosproject/store/flow/impl/DeviceFlowTable$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$onosproject$store$flow$impl$LifecycleEvent$Type = new int[LifecycleEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$onosproject$store$flow$impl$LifecycleEvent$Type[LifecycleEvent.Type.TERM_START.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$onosproject$store$flow$impl$LifecycleEvent$Type[LifecycleEvent.Type.TERM_ACTIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$onosproject$store$flow$impl$LifecycleEvent$Type[LifecycleEvent.Type.TERM_UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeviceFlowTable(DeviceId deviceId, ClusterService clusterService, ClusterCommunicationService clusterCommunicationService, LifecycleManager lifecycleManager, ScheduledExecutorService scheduledExecutorService, long j, long j2) {
        this.deviceId = deviceId;
        this.clusterCommunicator = clusterCommunicationService;
        this.lifecycleManager = lifecycleManager;
        this.executorService = scheduledExecutorService;
        this.localNodeId = clusterService.getLocalNode().id();
        addListeners();
        for (int i = 0; i < NUM_BUCKETS; i++) {
            this.flowBuckets.put(Integer.valueOf(i), new FlowBucket(new BucketId(deviceId, i)));
        }
        this.getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
        this.getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
        this.backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
        setBackupPeriod(j);
        setAntiEntropyPeriod(j2);
        registerSubscribers();
        startTerm(lifecycleManager.getReplicaInfo());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setBackupPeriod(long j) {
        ScheduledFuture<?> scheduledFuture = this.backupFuture;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.backupFuture = this.executorService.scheduleAtFixedRate(this::backup, j, j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setAntiEntropyPeriod(long j) {
        ScheduledFuture<?> scheduledFuture = this.antiEntropyFuture;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.antiEntropyFuture = this.executorService.scheduleAtFixedRate(this::runAntiEntropy, j, j, TimeUnit.MILLISECONDS);
    }

    public int count() {
        return this.flowBuckets.values().stream().mapToInt((v0) -> {
            return v0.count();
        }).sum();
    }

    public StoredFlowEntry getFlowEntry(FlowRule flowRule) {
        return getBucket(flowRule.id()).getFlowEntries(flowRule.id()).get(flowRule);
    }

    public Set<FlowEntry> getFlowEntries() {
        return (Set) this.flowBuckets.values().stream().flatMap(flowBucket -> {
            return flowBucket.getFlowBucket().values().stream();
        }).flatMap(map -> {
            return map.values().stream();
        }).collect(Collectors.toSet());
    }

    private FlowBucket getBucket(FlowId flowId) {
        return getBucket(bucket(flowId));
    }

    private FlowBucket getBucket(int i) {
        return this.flowBuckets.get(Integer.valueOf(i));
    }

    private int bucket(FlowId flowId) {
        return Math.abs((int) (((Long) flowId.id()).longValue() % 1024));
    }

    private Set<FlowBucketDigest> getDigests() {
        return (Set) this.flowBuckets.values().stream().map(flowBucket -> {
            return flowBucket.getDigest();
        }).collect(Collectors.toSet());
    }

    private FlowBucketDigest getDigest(int i) {
        return this.flowBuckets.get(Integer.valueOf(i)).getDigest();
    }

    public CompletableFuture<Void> add(FlowEntry flowEntry) {
        return runInTerm(flowEntry.id(), (flowBucket, l) -> {
            flowBucket.add(flowEntry, l.longValue(), this.clock);
            return null;
        });
    }

    public CompletableFuture<Void> update(FlowEntry flowEntry) {
        return runInTerm(flowEntry.id(), (flowBucket, l) -> {
            flowBucket.update(flowEntry, l.longValue(), this.clock);
            return null;
        });
    }

    public <T> CompletableFuture<T> update(FlowRule flowRule, Function<StoredFlowEntry, T> function) {
        return runInTerm(flowRule.id(), (flowBucket, l) -> {
            return flowBucket.update(flowRule, function, l.longValue(), this.clock);
        });
    }

    public CompletableFuture<FlowEntry> remove(FlowEntry flowEntry) {
        return runInTerm(flowEntry.id(), (flowBucket, l) -> {
            return flowBucket.remove(flowEntry, l.longValue(), this.clock);
        });
    }

    private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> biFunction) {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (!replicaInfo.isMaster(this.localNodeId)) {
            return Tools.exceptionalFuture(new IllegalStateException());
        }
        FlowBucket bucket = getBucket(flowId);
        long term = replicaInfo.term();
        if (this.activeTerm < term) {
            this.log.debug("Enqueueing operation for device {}", this.deviceId);
            synchronized (this.flowTasks) {
                if (this.activeTerm < term) {
                    CompletableFuture<T> completableFuture = new CompletableFuture<>();
                    this.flowTasks.computeIfAbsent(Integer.valueOf(bucket.bucketId().bucket()), num -> {
                        return new LinkedList();
                    }).add(() -> {
                        completableFuture.complete(biFunction.apply(bucket, Long.valueOf(term)));
                    });
                    return completableFuture;
                }
            }
        }
        return CompletableFuture.completedFuture(biFunction.apply(bucket, Long.valueOf(term)));
    }

    private void backup() {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (replicaInfo.isMaster(this.localNodeId)) {
            for (NodeId nodeId : replicaInfo.backups()) {
                try {
                    backup(nodeId, replicaInfo.term());
                } catch (Exception e) {
                    this.log.error("Backup of " + this.deviceId + " to " + nodeId + " failed", e);
                }
            }
        }
    }

    private void backup(NodeId nodeId, long j) {
        for (FlowBucket flowBucket : this.flowBuckets.values()) {
            if (flowBucket.term() == j) {
                LogicalTimestamp timestamp = flowBucket.timestamp();
                BackupOperation backupOperation = new BackupOperation(nodeId, flowBucket.bucketId().bucket());
                if (startBackup(backupOperation, timestamp)) {
                    backup(flowBucket.copy(), nodeId).whenCompleteAsync((bool, th) -> {
                        if (th != null) {
                            this.log.debug("Backup operation {} failed", backupOperation, th);
                            failBackup(backupOperation);
                        } else if (bool.booleanValue()) {
                            succeedBackup(backupOperation, timestamp);
                            backup(nodeId, j);
                        } else {
                            this.log.debug("Backup operation {} failed: term mismatch", backupOperation);
                            failBackup(backupOperation);
                        }
                    }, (Executor) this.executorService);
                }
            }
        }
    }

    private boolean startBackup(BackupOperation backupOperation, LogicalTimestamp logicalTimestamp) {
        LogicalTimestamp logicalTimestamp2 = this.lastBackupTimes.get(backupOperation);
        return logicalTimestamp != null && (logicalTimestamp2 == null || logicalTimestamp2.isOlderThan(logicalTimestamp)) && this.inFlightUpdates.add(backupOperation);
    }

    private void failBackup(BackupOperation backupOperation) {
        this.inFlightUpdates.remove(backupOperation);
    }

    private void succeedBackup(BackupOperation backupOperation, LogicalTimestamp logicalTimestamp) {
        this.lastBackupTimes.put(backupOperation, logicalTimestamp);
        this.inFlightUpdates.remove(backupOperation);
    }

    private void resetBackup(BackupOperation backupOperation) {
        this.lastBackupTimes.remove(backupOperation);
    }

    private CompletableFuture<Boolean> backup(FlowBucket flowBucket, NodeId nodeId) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Backing up {} flow entries in bucket {} to {}", new Object[]{Integer.valueOf(flowBucket.count()), flowBucket.bucketId(), nodeId});
        }
        return sendWithTimestamp(flowBucket, this.backupSubject, nodeId);
    }

    private boolean onBackup(FlowBucket flowBucket) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("{} - Received {} flow entries in bucket {} to backup", new Object[]{this.deviceId, Integer.valueOf(flowBucket.count()), flowBucket.bucketId()});
        }
        try {
            DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
            if (flowBucket.term() != replicaInfo.term()) {
                this.log.debug("Term mismatch for device {}: {} != {}", new Object[]{this.deviceId, Long.valueOf(flowBucket.term()), replicaInfo});
                return false;
            }
            this.flowBuckets.compute(Integer.valueOf(flowBucket.bucketId().bucket()), (num, flowBucket2) -> {
                return flowBucket.getDigest().isNewerThan(flowBucket2.getDigest()) ? flowBucket : flowBucket2;
            });
            return true;
        } catch (Exception e) {
            this.log.warn("Failure processing backup request", e);
            return false;
        }
    }

    private void runAntiEntropy() {
        DeviceReplicaInfo replicaInfo = this.lifecycleManager.getReplicaInfo();
        if (replicaInfo.isMaster(this.localNodeId)) {
            Iterator<NodeId> it = replicaInfo.backups().iterator();
            while (it.hasNext()) {
                runAntiEntropy(it.next());
            }
        }
    }

    private void runAntiEntropy(NodeId nodeId) {
        requestDigests(nodeId).thenAcceptAsync(set -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                FlowBucketDigest flowBucketDigest = (FlowBucketDigest) it.next();
                if (getBucket(flowBucketDigest.bucket()).getDigest().isNewerThan(flowBucketDigest)) {
                    this.log.debug("Detected missing flow entries on node {} in bucket {}/{}", new Object[]{nodeId, this.deviceId, Integer.valueOf(flowBucketDigest.bucket())});
                    resetBackup(new BackupOperation(nodeId, flowBucketDigest.bucket()));
                }
            }
        }, (Executor) this.executorService);
    }

    private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
        return sendWithTimestamp(this.deviceId, this.getDigestsSubject, nodeId);
    }

    private void syncFlows(DeviceReplicaInfo deviceReplicaInfo, DeviceReplicaInfo deviceReplicaInfo2) {
        if (deviceReplicaInfo == null) {
            activateMaster(deviceReplicaInfo2);
        } else if (deviceReplicaInfo.master() == null || deviceReplicaInfo.master().equals(this.localNodeId)) {
            syncFlowsOnBackups(deviceReplicaInfo, deviceReplicaInfo2);
        } else {
            syncFlowsOnMaster(deviceReplicaInfo, deviceReplicaInfo2);
        }
    }

    private void syncFlowsOnMaster(DeviceReplicaInfo deviceReplicaInfo, DeviceReplicaInfo deviceReplicaInfo2) {
        syncFlowsOn(deviceReplicaInfo.master()).whenCompleteAsync((r8, th) -> {
            if (th == null) {
                activateMaster(deviceReplicaInfo2);
            } else {
                this.log.debug("Failed to synchronize flows on previous master {}", deviceReplicaInfo.master(), th);
                syncFlowsOnBackups(deviceReplicaInfo, deviceReplicaInfo2);
            }
        }, (Executor) this.executorService);
    }

    private void syncFlowsOnBackups(DeviceReplicaInfo deviceReplicaInfo, DeviceReplicaInfo deviceReplicaInfo2) {
        List list = (List) deviceReplicaInfo.backups().stream().filter(nodeId -> {
            return !nodeId.equals(this.localNodeId);
        }).collect(Collectors.toList());
        syncFlowsOn(list).whenCompleteAsync((r8, th) -> {
            if (th != null) {
                this.log.debug("Failed to synchronize flows on previous backup nodes {}", list, th);
            }
            activateMaster(deviceReplicaInfo2);
        }, (Executor) this.executorService);
    }

    private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(null) : Tools.firstOf((List) collection.stream().map(nodeId -> {
            return syncFlowsOn(nodeId);
        }).collect(Collectors.toList())).thenApply(r2 -> {
            return null;
        });
    }

    private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
        return requestDigests(nodeId).thenCompose(set -> {
            return Tools.allOf((List) set.stream().filter(flowBucketDigest -> {
                return flowBucketDigest.isNewerThan(getDigest(flowBucketDigest.bucket()));
            }).map(flowBucketDigest2 -> {
                return syncBucketOn(nodeId, flowBucketDigest2.bucket());
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list -> {
            return null;
        });
    }

    private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int i) {
        return requestBucket(nodeId, i).thenAcceptAsync(flowBucket -> {
            this.flowBuckets.compute(Integer.valueOf(flowBucket.bucketId().bucket()), (num, flowBucket) -> {
                return flowBucket.getDigest().isNewerThan(flowBucket.getDigest()) ? flowBucket : flowBucket;
            });
        }, (Executor) this.executorService);
    }

    private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int i) {
        this.log.debug("Requesting flow bucket {} from {}", Integer.valueOf(i), nodeId);
        return sendWithTimestamp(Integer.valueOf(i), this.getBucketSubject, nodeId);
    }

    private FlowBucket onGetBucket(int i) {
        return this.flowBuckets.get(Integer.valueOf(i)).copy();
    }

    private void activateMaster(DeviceReplicaInfo deviceReplicaInfo) {
        this.log.debug("Activating term {} for device {}", Long.valueOf(deviceReplicaInfo.term()), this.deviceId);
        for (int i = 0; i < NUM_BUCKETS; i++) {
            activateBucket(i);
        }
        this.lifecycleManager.activate(deviceReplicaInfo.term());
        this.activeTerm = deviceReplicaInfo.term();
    }

    private void activateBucket(int i) {
        Queue<Runnable> remove;
        synchronized (this.flowTasks) {
            remove = this.flowTasks.remove(Integer.valueOf(i));
        }
        if (remove != null) {
            this.log.debug("Completing enqueued operations for device {}", this.deviceId);
            remove.forEach(runnable -> {
                runnable.run();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onLifecycleEvent(LifecycleEvent lifecycleEvent) {
        this.log.debug("Received lifecycle event for device {}: {}", this.deviceId, lifecycleEvent);
        switch (AnonymousClass2.$SwitchMap$org$onosproject$store$flow$impl$LifecycleEvent$Type[((LifecycleEvent.Type) lifecycleEvent.type()).ordinal()]) {
            case 1:
                startTerm((DeviceReplicaInfo) lifecycleEvent.subject());
                return;
            case OsgiPropertyConstants.MAX_BACKUP_COUNT_DEFAULT /* 2 */:
                activateTerm((DeviceReplicaInfo) lifecycleEvent.subject());
                return;
            case 3:
                updateTerm((DeviceReplicaInfo) lifecycleEvent.subject());
                return;
            default:
                return;
        }
    }

    private void startTerm(DeviceReplicaInfo deviceReplicaInfo) {
        DeviceReplicaInfo deviceReplicaInfo2 = this.replicaInfo;
        this.replicaInfo = deviceReplicaInfo;
        if (deviceReplicaInfo.isMaster(this.localNodeId)) {
            this.log.info("Synchronizing device {} flows for term {}", this.deviceId, Long.valueOf(deviceReplicaInfo.term()));
            syncFlows(deviceReplicaInfo2, deviceReplicaInfo);
        }
    }

    private void activateTerm(DeviceReplicaInfo deviceReplicaInfo) {
        if (deviceReplicaInfo.term() < this.replicaInfo.term()) {
            return;
        }
        if (deviceReplicaInfo.term() > this.replicaInfo.term()) {
            this.replicaInfo = deviceReplicaInfo;
        }
        if (!deviceReplicaInfo.isMaster(this.localNodeId) && !deviceReplicaInfo.isBackup(this.localNodeId)) {
            this.flowBuckets.values().forEach(flowBucket -> {
                flowBucket.clear();
            });
        }
        this.activeTerm = deviceReplicaInfo.term();
    }

    private void updateTerm(DeviceReplicaInfo deviceReplicaInfo) {
        if (deviceReplicaInfo.term() == this.replicaInfo.term()) {
            this.replicaInfo = deviceReplicaInfo;
            if (this.activeTerm != deviceReplicaInfo.term() || deviceReplicaInfo.isMaster(this.localNodeId) || deviceReplicaInfo.isBackup(this.localNodeId)) {
                return;
            }
            this.flowBuckets.values().forEach(flowBucket -> {
                flowBucket.clear();
            });
        }
    }

    private <M, R> CompletableFuture<R> sendWithTimestamp(M m, MessageSubject messageSubject, NodeId nodeId) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Timestamped timestamp = this.clock.timestamp(m);
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        Function function = (v1) -> {
            return r3.encode(v1);
        };
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        return clusterCommunicationService.sendAndReceive(timestamp, messageSubject, function, serializer2::decode, nodeId).thenApply(timestamped -> {
            this.clock.tick(timestamped.timestamp());
            return timestamped.value();
        });
    }

    private <M, R> void receiveWithTimestamp(MessageSubject messageSubject, Function<M, R> function) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = SERIALIZER;
        Objects.requireNonNull(serializer);
        Function function2 = serializer::decode;
        Function function3 = timestamped -> {
            this.clock.tick(timestamped.timestamp());
            return this.clock.timestamp(function.apply(timestamped.value()));
        };
        Serializer serializer2 = SERIALIZER;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.addSubscriber(messageSubject, function2, function3, (v1) -> {
            return r4.encode(v1);
        }, this.executorService);
    }

    private void registerSubscribers() {
        receiveWithTimestamp(this.getDigestsSubject, obj -> {
            return getDigests();
        });
        receiveWithTimestamp(this.getBucketSubject, (v1) -> {
            return onGetBucket(v1);
        });
        receiveWithTimestamp(this.backupSubject, this::onBackup);
    }

    private void unregisterSubscribers() {
        this.clusterCommunicator.removeSubscriber(this.getDigestsSubject);
        this.clusterCommunicator.removeSubscriber(this.getBucketSubject);
        this.clusterCommunicator.removeSubscriber(this.backupSubject);
    }

    private void addListeners() {
        this.lifecycleManager.addListener(this.lifecycleEventListener);
    }

    private void removeListeners() {
        this.lifecycleManager.removeListener(this.lifecycleEventListener);
    }

    private synchronized void cancelFutures() {
        ScheduledFuture<?> scheduledFuture = this.backupFuture;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        ScheduledFuture<?> scheduledFuture2 = this.antiEntropyFuture;
        if (scheduledFuture2 != null) {
            scheduledFuture2.cancel(false);
        }
    }

    public void purge() {
        this.flowTasks.clear();
        this.flowBuckets.values().forEach(flowBucket -> {
            flowBucket.purge();
        });
        this.lastBackupTimes.clear();
        this.inFlightUpdates.clear();
    }

    public void close() {
        removeListeners();
        unregisterSubscribers();
        cancelFutures();
        this.lifecycleManager.close();
    }
}
