package oracle.kv.impl.tif;

import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.rep.subscription.SubscriptionCallback;
import com.sleepycat.je.utilint.VLSN;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
import oracle.kv.impl.tif.SubscriptionManager;

/* loaded from: input_file:oracle/kv/impl/tif/FeederSubscriptionCbk.class */
class FeederSubscriptionCbk implements SubscriptionCallback {
    protected final Logger logger;
    protected final SubscriptionManager.SubscriptionFilter filter;
    protected final BlockingQueue<DataItem> inputQueueRepStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FeederSubscriptionCbk(SubscriptionManager.SubscriptionFilter subscriptionFilter, BlockingQueue<DataItem> blockingQueue, Logger logger) {
        this.filter = subscriptionFilter;
        this.inputQueueRepStream = blockingQueue;
        this.logger = logger;
    }

    public void processPut(VLSN vlsn, byte[] bArr, byte[] bArr2, long j) {
        processEntry(new DataItem(vlsn, j, bArr, bArr2));
    }

    public void processDel(VLSN vlsn, byte[] bArr, long j) {
        processEntry(new DataItem(vlsn, j, bArr));
    }

    public void processCommit(VLSN vlsn, long j) {
        processEntry(new DataItem(vlsn, j, LogEntryType.LOG_TXN_COMMIT));
    }

    public void processAbort(VLSN vlsn, long j) {
        processEntry(new DataItem(vlsn, j, LogEntryType.LOG_TXN_ABORT));
    }

    public void processException(Exception exc) {
        this.logger.info("Exception in subscription of replication stream " + exc.getLocalizedMessage());
        processEntry(new DataItem(exc));
    }

    private void processEntry(DataItem dataItem) {
        DataItem filter = this.filter.filter(dataItem);
        if (filter == null) {
            return;
        }
        while (true) {
            try {
                this.inputQueueRepStream.put(filter);
                return;
            } catch (InterruptedException e) {
                this.logger.warning("Interrupted input queue operation put, retrying");
            }
        }
    }
}
