package oracle.kv.impl.tif;

import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.subscription.Subscription;
import com.sleepycat.je.rep.subscription.SubscriptionStatus;
import com.sleepycat.je.utilint.InternalException;
import com.sleepycat.je.utilint.VLSN;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import oracle.kv.impl.rep.subscription.partreader.PartitionReader;
import oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack;
import oracle.kv.impl.tif.SubscriptionManager;
import oracle.kv.impl.topo.PartitionId;

/* loaded from: input_file:oracle/kv/impl/tif/FeederPartReaderCbk.class */
class FeederPartReaderCbk implements PartitionReaderCallBack {
    private final PartitionId partitionId;
    private final SubscriptionManager manager;
    private final BlockingQueue<DataItem> inputQueue;
    private final PartitionReader reader;
    private final Subscription repStreamConsumer;
    private final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FeederPartReaderCbk(SubscriptionManager subscriptionManager, PartitionId partitionId, Logger logger) {
        this.partitionId = partitionId;
        this.manager = subscriptionManager;
        this.logger = logger;
        this.reader = subscriptionManager.getPartitionReaderMap().get(partitionId);
        this.repStreamConsumer = subscriptionManager.getRepStreamConsumer();
        this.inputQueue = subscriptionManager.getInputQueuePartReader();
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processCopy(PartitionId partitionId, long j, long j2, byte[] bArr, byte[] bArr2) {
        DataItem dataItem = new DataItem(new VLSN(j), DataItem.TXN_ID_COPY_IN_PARTTRANS, bArr, bArr2);
        dataItem.setPartitionId(partitionId);
        processEntry(dataItem);
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processPut(PartitionId partitionId, long j, long j2, byte[] bArr, byte[] bArr2, long j3) {
        DataItem dataItem = new DataItem(new VLSN(j), j3, bArr, bArr2);
        dataItem.setPartitionId(partitionId);
        processEntry(dataItem);
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processDel(PartitionId partitionId, long j, byte[] bArr, long j2) {
        DataItem dataItem = new DataItem(new VLSN(j), j2, bArr);
        dataItem.setPartitionId(partitionId);
        processEntry(dataItem);
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processPrepare(PartitionId partitionId, long j) {
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processCommit(PartitionId partitionId, long j) {
        DataItem dataItem = new DataItem(VLSN.NULL_VLSN, j, LogEntryType.LOG_TXN_COMMIT);
        dataItem.setPartitionId(partitionId);
        processEntry(dataItem);
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public void processAbort(PartitionId partitionId, long j) {
        DataItem dataItem = new DataItem(VLSN.NULL_VLSN, j, LogEntryType.LOG_TXN_ABORT);
        dataItem.setPartitionId(partitionId);
        processEntry(dataItem);
    }

    @Override // oracle.kv.impl.rep.subscription.partreader.PartitionReaderCallBack
    public synchronized void processEOD(PartitionId partitionId) {
        SubscriptionManager.SubscriptionFilter subscriptionFilter = this.manager.getSubscriptionFilter();
        VLSN vlsn = new VLSN(this.reader.getHighestVLSN());
        this.logger.info(this.partitionId + " transfer is done at high vlsn " + vlsn);
        subscriptionFilter.addPartition(partitionId);
        try {
            if (subscriptionFilter.getCompletePartitions().size() == 1) {
                if (!$assertionsDisabled && this.repStreamConsumer == null) {
                    throw new AssertionError();
                }
                if (this.repStreamConsumer.getSubscriptionStatus().equals(SubscriptionStatus.SUCCESS)) {
                    this.logger.info("the rep stream consumer has already started when " + this.partitionId + " is done.");
                } else {
                    this.repStreamConsumer.start(vlsn.getNext());
                    if (!$assertionsDisabled && !this.repStreamConsumer.getSubscriptionStatus().equals(SubscriptionStatus.SUCCESS)) {
                        throw new AssertionError();
                    }
                    this.logger.info("successfully start rep stream consumer from VLSN " + vlsn.getNext() + " after finishing " + this.partitionId);
                }
            }
            if (this.manager.allPartComplete()) {
                this.manager.setSubscriptionState(SubscriptionState.REPLICATION_STREAM);
            }
        } catch (InsufficientLogException | IllegalArgumentException | TimeoutException | GroupShutdownException | InternalException e) {
            this.logger.warning("Unable to subscribe from VLSN " + vlsn.getNext() + " after finishing " + this.partitionId + ", reason: " + e.getMessage());
            this.manager.shutdown(SubscriptionState.ERROR);
        }
    }

    private void processEntry(DataItem dataItem) {
        while (true) {
            try {
                this.inputQueue.put(dataItem);
                return;
            } catch (InterruptedException e) {
                this.logger.warning(this.partitionId + ": interrupted input queue operation put, retrying");
            }
        }
    }

    static {
        $assertionsDisabled = !FeederPartReaderCbk.class.desiredAssertionStatus();
    }
}
