package oracle.kv.impl.tif;

import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.subscription.Subscription;
import com.sleepycat.je.rep.subscription.SubscriptionConfig;
import com.sleepycat.je.rep.subscription.SubscriptionStatus;
import com.sleepycat.je.utilint.InternalException;
import com.sleepycat.je.utilint.VLSN;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import oracle.kv.impl.api.KVStoreImpl;
import oracle.kv.impl.api.SharedThreadPool;
import oracle.kv.impl.rep.subscription.partreader.PartitionReader;
import oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack;
import oracle.kv.impl.rep.subscription.partreader.PartitionReaderStatus;
import oracle.kv.impl.topo.PartitionId;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:oracle/kv/impl/tif/SubscriptionManager.class */
public class SubscriptionManager {
    public static final int MAX_CONCURRENT_PARTITION_READERS = 1;
    private final Logger logger;
    private final SourceRepNode sourceRN;
    private final HostRepNode hostRN;
    private final SubscriptionConfig config;
    private final Subscription repStreamConsumer;
    private final KVStoreImpl.TaskExecutor executor;
    private final BlockingQueue<DataItem> inputQueueRepStream;
    private final BlockingQueue<DataItem> inputQueuePartReader;
    private final Set<PartitionId> managedPartitions;
    private SubscriptionState state;
    private long topologySeq;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<PartitionId, PartitionReader> partitionReaderMap = new HashMap();
    private final Map<PartitionId, PartitionReaderCallBack> partReaderCbkMap = new HashMap();
    private final int dop = computeDOP();
    private final SubscriptionFilter filter = new SubscriptionFilter();

    /* loaded from: input_file:oracle/kv/impl/tif/SubscriptionManager$SubscriptionFilter.class */
    public class SubscriptionFilter {
        private Set<PartitionId> completePartitions = new HashSet();
        private AtomicLong numEntryFiltered = new AtomicLong(0);

        SubscriptionFilter() {
        }

        public Set<PartitionId> getCompletePartitions() {
            return this.completePartitions;
        }

        public void setCompletePartitions(Set<PartitionId> set) {
            this.completePartitions = set;
        }

        public long getNumEntryFiltered() {
            return this.numEntryFiltered.get();
        }

        public synchronized boolean addPartition(PartitionId partitionId) {
            return this.completePartitions.add(partitionId);
        }

        public synchronized boolean removePartition(PartitionId partitionId) {
            return this.completePartitions.remove(partitionId);
        }

        public DataItem filter(DataItem dataItem) {
            if (SubscriptionManager.this.state != SubscriptionState.PARTITION_TRANSFER) {
                return dataItem;
            }
            if (dataItem.isTxnAbort() || dataItem.isTxnCommit()) {
                return dataItem;
            }
            if (this.completePartitions.contains(SubscriptionManager.this.sourceRN.getPartitionId(dataItem.getKey()))) {
                return dataItem;
            }
            this.numEntryFiltered.incrementAndGet();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionManager(SourceRepNode sourceRepNode, HostRepNode hostRepNode, SubscriptionConfig subscriptionConfig, Logger logger) {
        this.sourceRN = sourceRepNode;
        this.hostRN = hostRepNode;
        this.config = subscriptionConfig;
        this.logger = logger;
        this.managedPartitions = new HashSet(sourceRepNode.getPartitionIdSet());
        this.topologySeq = sourceRepNode.getTopoSequence();
        this.executor = new SharedThreadPool(logger).getTaskExecutor(this.dop);
        this.inputQueueRepStream = new ArrayBlockingQueue(subscriptionConfig.getInputMessageQueueSize());
        this.inputQueuePartReader = new ArrayBlockingQueue(subscriptionConfig.getOutputMessageQueueSize());
        subscriptionConfig.setCallback(new FeederSubscriptionCbk(this.filter, this.inputQueueRepStream, logger));
        this.repStreamConsumer = new Subscription(subscriptionConfig, logger);
        this.state = SubscriptionState.READY;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startStream(VLSN vlsn) {
        this.state = SubscriptionState.REPLICATION_STREAM;
        String fullName = this.sourceRN.getRepNodeId().getFullName();
        this.logger.log(Level.INFO, "Start streaming from source node {0}, start vlsn: {1}", new Object[]{fullName, vlsn});
        try {
            this.repStreamConsumer.start(vlsn);
            this.logger.log(Level.INFO, "Subscription succeeded, requested vlsn {0} is available at {1}.", new Object[]{vlsn, fullName});
        } catch (IllegalArgumentException | GroupShutdownException | InternalException | TimeoutException e) {
            this.logger.log(Level.WARNING, "Unable to start replication due to error {0}", e.getMessage());
            this.repStreamConsumer.shutdown();
            this.state = SubscriptionState.ERROR;
        } catch (InsufficientLogException e2) {
            this.logger.log(Level.INFO, "Requested vlsn {0} is not available at {1}, switch to initial replication.", new Object[]{vlsn, fullName});
            this.repStreamConsumer.shutdown();
            startStream(this.managedPartitions);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startStream(Set<PartitionId> set) {
        this.state = SubscriptionState.PARTITION_TRANSFER;
        Iterator<PartitionId> it = set.iterator();
        while (it.hasNext()) {
            scheduleTransfer(it.next());
        }
        this.logger.log(Level.INFO, "All {0} partition receivers scheduled to transfer, with DOP {1}, partitions scheduled to transfer: {2}", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(this.dop), partitionListToString(set)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startStream(VLSN vlsn, Set<PartitionId> set) {
        if (!$assertionsDisabled && (set == null || set.size() <= 0)) {
            throw new AssertionError();
        }
        this.state = SubscriptionState.PARTITION_TRANSFER;
        this.filter.setCompletePartitions(set);
        this.logger.log(Level.INFO, "All {0} partition receivers scheduled to transfer, with DOP {1}", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(this.dop)});
        try {
            this.repStreamConsumer.start(vlsn);
            if (!$assertionsDisabled && !this.repStreamConsumer.getSubscriptionStatus().equals(SubscriptionStatus.SUCCESS)) {
                throw new AssertionError();
            }
            this.logger.log(Level.INFO, "Successfully start rep stream consumer from vlsn {0}.", vlsn);
            Iterator<PartitionId> it = set.iterator();
            while (it.hasNext()) {
                scheduleTransfer(it.next());
            }
            this.logger.log(Level.INFO, "All {0} partitions scheduled to transfer with DOP {1}, list of partitions: {2}.", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(this.dop), partitionListToString(set)});
        } catch (IllegalArgumentException | TimeoutException | GroupShutdownException | InternalException | InsufficientLogException e) {
            this.logger.log(Level.WARNING, "Unable to subscribe from VLSN {0}, reason: {1}", new Object[]{vlsn, e.getMessage()});
            shutdown(SubscriptionState.ERROR);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(SubscriptionState subscriptionState) {
        for (Map.Entry<PartitionId, PartitionReader> entry : this.partitionReaderMap.entrySet()) {
            PartitionReader value = entry.getValue();
            this.logger.log(Level.INFO, "Shutdown receiver for {0} in state {1}", new Object[]{entry.getKey(), value.getStatus().getState()});
            value.shutdown();
        }
        this.logger.log(Level.INFO, "All partition readers shut down.");
        this.repStreamConsumer.shutdown();
        this.logger.log(Level.INFO, "Rep stream consumer shut down, all subscription activities stopped.");
        this.state = subscriptionState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Subscription getRepStreamConsumer() {
        return this.repStreamConsumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getDOPForPartTransfer() {
        return this.dop;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionFilter getSubscriptionFilter() {
        return this.filter;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionState getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockingQueue<DataItem> getInputQueueRepStream() {
        return this.inputQueueRepStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockingQueue<DataItem> getInputQueuePartReader() {
        return this.inputQueuePartReader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<PartitionId, PartitionReader> getPartitionReaderMap() {
        return this.partitionReaderMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isManangedPartition(PartitionId partitionId) {
        return this.managedPartitions.contains(partitionId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<PartitionId> getManagedPartitions() {
        return this.managedPartitions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getCurrentTopologySeq() {
        return this.topologySeq;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCurrentTopologySeq(long j) {
        this.topologySeq = j;
    }

    synchronized void setPartitionReaderCallBack(PartitionId partitionId, PartitionReaderCallBack partitionReaderCallBack) {
        this.partReaderCbkMap.put(partitionId, partitionReaderCallBack);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setSubscriptionState(SubscriptionState subscriptionState) {
        SubscriptionState subscriptionState2 = this.state;
        this.state = subscriptionState;
        this.logger.log(Level.INFO, "Subscription state is set from {0} to {1}", new Object[]{subscriptionState2, this.state});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean allPartComplete() {
        for (PartitionId partitionId : this.managedPartitions) {
            if (!this.partitionReaderMap.containsKey(partitionId) || this.partitionReaderMap.get(partitionId).getStatus().getState() != PartitionReaderStatus.PartitionRepState.DONE) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addPartition(PartitionId partitionId) {
        if (partitionId == null) {
            this.logger.log(Level.FINE, "Null partition, ignore");
        } else {
            if (this.managedPartitions.contains(partitionId)) {
                this.logger.log(Level.FINE, "Partition {0} already exist, ignore", partitionId);
                return;
            }
            this.managedPartitions.add(partitionId);
            scheduleTransfer(partitionId);
            this.logger.log(Level.INFO, "Partition {0} added into managed partitions: {1}", new Object[]{partitionId, partitionListToString(this.managedPartitions)});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void removePartition(PartitionId partitionId) {
        if (partitionId == null) {
            this.logger.log(Level.FINE, "Null partition, ignore");
            return;
        }
        if (!this.managedPartitions.contains(partitionId)) {
            this.logger.log(Level.FINE, "Partition {0} does not exist, ignore", partitionId);
            return;
        }
        PartitionReader partitionReader = this.partitionReaderMap.get(partitionId);
        if (partitionReader != null) {
            this.logger.log(Level.FINE, "Shutdown reader for {0} in state {1}", new Object[]{partitionId, partitionReader.getStatus().getState()});
            partitionReader.shutdown();
            this.filter.removePartition(partitionId);
        } else {
            this.logger.log(Level.FINE, "No reader for {0}, ignore.", partitionId);
        }
        this.managedPartitions.remove(partitionId);
        this.logger.log(Level.INFO, "Partition {0} removed from managed partitions: {1}", new Object[]{partitionId, partitionListToString(this.managedPartitions)});
    }

    private int computeDOP() {
        return Math.min(Math.min(this.managedPartitions.size(), this.sourceRN.getConcurrentSourceLimit()), 1);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v25, types: [oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack] */
    private synchronized void scheduleTransfer(PartitionId partitionId) {
        FeederPartReaderCbk feederPartReaderCbk;
        if (this.partitionReaderMap.containsKey(partitionId)) {
            PartitionReader partitionReader = this.partitionReaderMap.get(partitionId);
            PartitionReaderStatus.PartitionRepState state = partitionReader.getStatus().getState();
            if (state == PartitionReaderStatus.PartitionRepState.IDLE || state == PartitionReaderStatus.PartitionRepState.REPLICATING) {
                this.logger.log(Level.FINE, "Partition {0} scheduled to transfer, or in transfer, ignore (state: {1})", new Object[]{partitionId, state});
                return;
            } else {
                this.logger.log(Level.FINE, "Found existent reader for {0}, state: {1}.", new Object[]{partitionId, state});
                partitionReader.shutdown();
                this.partitionReaderMap.remove(partitionId);
            }
        }
        if (this.partReaderCbkMap.containsKey(partitionId)) {
            feederPartReaderCbk = this.partReaderCbkMap.get(partitionId);
            this.logger.log(Level.FINE, "Partition {0} uses client-defined cbk.", partitionId);
        } else {
            feederPartReaderCbk = new FeederPartReaderCbk(this, partitionId, this.logger);
        }
        PartitionReader partitionReader2 = new PartitionReader(this.hostRN.getRepEnv(), partitionId, feederPartReaderCbk, this.config, this.logger);
        this.partitionReaderMap.put(partitionId, partitionReader2);
        this.executor.submit(partitionReader2);
        this.logger.log(Level.FINE, "Partition {0} is scheduled to transfer.", partitionId);
        if (this.state.equals(SubscriptionState.PARTITION_TRANSFER)) {
            return;
        }
        this.logger.log(Level.INFO, "Subscription state changes from {0} to {1}.", new Object[]{this.state, SubscriptionState.PARTITION_TRANSFER});
        this.state = SubscriptionState.PARTITION_TRANSFER;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String partitionListToString(Set<PartitionId> set) {
        return (set == null || set.isEmpty()) ? "[]" : "[" + Arrays.toString(set.toArray()) + "]";
    }

    static {
        $assertionsDisabled = !SubscriptionManager.class.desiredAssertionStatus();
    }
}
