package io.debezium.connector.informix;

import com.informix.jdbc.IfmxReadableType;
import com.informix.jdbcx.IfxDataSource;
import com.informix.stream.api.IfmxStreamOperationRecord;
import com.informix.stream.api.IfmxStreamRecord;
import com.informix.stream.api.IfmxStreamRecordType;
import com.informix.stream.cdc.IfxCDCEngine;
import com.informix.stream.cdc.records.IfxCDCBeginTransactionRecord;
import com.informix.stream.cdc.records.IfxCDCCommitTransactionRecord;
import com.informix.stream.cdc.records.IfxCDCTruncateRecord;
import com.informix.stream.impl.IfxStreamException;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/informix/InformixStreamingChangeEventSource.class */
public class InformixStreamingChangeEventSource implements StreamingChangeEventSource<InformixPartition, InformixOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InformixStreamingChangeEventSource.class);
    private static final String RECEIVED_GENERIC_RECORD = "Received {} ElapsedT [{}ms]";
    private static final String RECEIVED_UNKNOWN_RECORD_TYPE = "Received unknown record-type {} ElapsedT [{}ms]";
    private final InformixConnectorConfig connectorConfig;
    private final InformixConnection dataConnection;
    private final EventDispatcher<InformixPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final InformixDatabaseSchema schema;
    private InformixOffsetContext effectiveOffsetContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.debezium.connector.informix.InformixStreamingChangeEventSource$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/informix/InformixStreamingChangeEventSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$informix$stream$api$IfmxStreamRecordType = new int[IfmxStreamRecordType.values().length];

        static {
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.TRANSACTION_GROUP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.METADATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.ERROR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.INSERT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.BEFORE_UPDATE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.AFTER_UPDATE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.DELETE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$informix$stream$api$IfmxStreamRecordType[IfmxStreamRecordType.TRUNCATE.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    public InformixStreamingChangeEventSource(InformixConnectorConfig informixConnectorConfig, InformixConnection informixConnection, EventDispatcher<InformixPartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, InformixDatabaseSchema informixDatabaseSchema) {
        this.connectorConfig = informixConnectorConfig;
        this.dataConnection = informixConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = informixDatabaseSchema;
    }

    public void init(InformixOffsetContext informixOffsetContext) {
        this.effectiveOffsetContext = informixOffsetContext == null ? new InformixOffsetContext(this.connectorConfig, TxLogPosition.current(), false, false) : informixOffsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, InformixPartition informixPartition, InformixOffsetContext informixOffsetContext) throws InterruptedException {
        if (!this.connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return;
        }
        TxLogPosition changePosition = informixOffsetContext.getChangePosition();
        Lsn commitLsn = changePosition.getCommitLsn();
        Lsn beginLsn = changePosition.getBeginLsn();
        try {
            InformixCdcTransactionEngine transactionEngine = getTransactionEngine(changeEventSourceContext, this.schema, beginLsn);
            try {
                transactionEngine.init();
                InformixStreamTransactionRecord m4getTransaction = transactionEngine.m4getTransaction();
                if (beginLsn.compareTo(commitLsn) < 0) {
                    LOGGER.info("Begin recover: from lastBeginLsn='{}' to lastCommitLsn='{}'", beginLsn, commitLsn);
                    while (true) {
                        if (!changeEventSourceContext.isRunning()) {
                            break;
                        }
                        if (changeEventSourceContext.isPaused()) {
                            LOGGER.info("Streaming will now pause");
                            changeEventSourceContext.streamingPaused();
                            changeEventSourceContext.waitSnapshotCompletion();
                            LOGGER.info("Streaming resumed");
                        }
                        this.dispatcher.dispatchHeartbeatEvent(informixPartition, informixOffsetContext);
                        Lsn of = Lsn.of(Long.valueOf(m4getTransaction.getEndRecord().getSequenceId()));
                        if (of.compareTo(commitLsn) < 0) {
                            LOGGER.info("Skipping transaction with id: '{}' since commitLsn='{}' < lastCommitLsn='{}'", new Object[]{Integer.valueOf(m4getTransaction.getTransactionId()), of, commitLsn});
                        } else {
                            if (of.compareTo(commitLsn) > 0) {
                                LOGGER.info("Recover finished: from lastBeginLsn='{}' to lastCommitLsn='{}', current Lsn='{}'", new Object[]{beginLsn, commitLsn, of});
                                break;
                            }
                            handleTransaction(transactionEngine, informixPartition, informixOffsetContext, m4getTransaction, true);
                        }
                        m4getTransaction = transactionEngine.m4getTransaction();
                    }
                }
                InformixStreamTransactionRecord informixStreamTransactionRecord = m4getTransaction;
                while (changeEventSourceContext.isRunning()) {
                    if (changeEventSourceContext.isPaused()) {
                        LOGGER.info("Streaming will now pause");
                        changeEventSourceContext.streamingPaused();
                        changeEventSourceContext.waitSnapshotCompletion();
                        LOGGER.info("Streaming resumed");
                    }
                    this.dispatcher.dispatchHeartbeatEvent(informixPartition, informixOffsetContext);
                    switch (AnonymousClass1.$SwitchMap$com$informix$stream$api$IfmxStreamRecordType[informixStreamTransactionRecord.getType().ordinal()]) {
                        case 1:
                            handleTransaction(transactionEngine, informixPartition, informixOffsetContext, informixStreamTransactionRecord, false);
                            break;
                        case 2:
                        case 3:
                        case 4:
                            LOGGER.debug(RECEIVED_GENERIC_RECORD, informixStreamTransactionRecord, 0);
                            break;
                        default:
                            LOGGER.debug(RECEIVED_UNKNOWN_RECORD_TYPE, informixStreamTransactionRecord, 0);
                            break;
                    }
                    informixStreamTransactionRecord = transactionEngine.getRecord();
                }
                if (transactionEngine != null) {
                    transactionEngine.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            LOGGER.error("Caught InterruptedException", e);
            this.errorHandler.setProducerThrowable(e);
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            LOGGER.error("Caught Exception", e2);
            this.errorHandler.setProducerThrowable(e2);
        }
    }

    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
    }

    /* renamed from: getOffsetContext, reason: merged with bridge method [inline-methods] */
    public InformixOffsetContext m18getOffsetContext() {
        return this.effectiveOffsetContext;
    }

    public InformixCdcTransactionEngine getTransactionEngine(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, InformixDatabaseSchema informixDatabaseSchema, Lsn lsn) throws SQLException {
        return new InformixCdcTransactionEngine(changeEventSourceContext, getCDCEngine(informixDatabaseSchema, lsn));
    }

    private IfxCDCEngine getCDCEngine(InformixDatabaseSchema informixDatabaseSchema, Lsn lsn) throws SQLException {
        IfxCDCEngine.Builder timeout = IfxCDCEngine.builder(new IfxDataSource(this.dataConnection.connectionString())).buffer(this.connectorConfig.getCdcBuffersize()).timeout(this.connectorConfig.getCdcTimeout());
        informixDatabaseSchema.tableIds().forEach(tableId -> {
            timeout.watchTable(tableId.identifier(), (String[]) informixDatabaseSchema.tableFor(tableId).retrieveColumnNames().toArray(i -> {
                return new String[i];
            }));
        });
        if (lsn.isAvailable()) {
            timeout.sequenceId(lsn.sequence());
        }
        if (LOGGER.isInfoEnabled()) {
            long sequenceId = timeout.getSequenceId();
            LOGGER.info("Set CDCEngine's LSN to '{}' aka {}", Long.valueOf(sequenceId), Lsn.of(Long.valueOf(sequenceId)).toLongString());
        }
        return timeout.build();
    }

    private void handleTransaction(InformixCdcTransactionEngine informixCdcTransactionEngine, InformixPartition informixPartition, InformixOffsetContext informixOffsetContext, InformixStreamTransactionRecord informixStreamTransactionRecord, boolean z) throws InterruptedException, IfxStreamException {
        long nanoTime = System.nanoTime();
        int transactionId = informixStreamTransactionRecord.getTransactionId();
        IfxCDCBeginTransactionRecord beginRecord = informixStreamTransactionRecord.getBeginRecord();
        IfxCDCCommitTransactionRecord endRecord = informixStreamTransactionRecord.getEndRecord();
        long nanoTime2 = System.nanoTime();
        long time = beginRecord.getTime();
        long sequenceId = beginRecord.getSequenceId();
        long orElse = informixCdcTransactionEngine.getLowestBeginSequence().orElse(sequenceId);
        long sequenceId2 = endRecord.getSequenceId();
        if (!z) {
            updateChangePosition(informixOffsetContext, null, Long.valueOf(sequenceId), Integer.valueOf(transactionId), Long.valueOf(orElse));
            this.dispatcher.dispatchTransactionStartedEvent(informixPartition, String.valueOf(transactionId), informixOffsetContext, Instant.ofEpochSecond(time));
        }
        long nanoTime3 = System.nanoTime();
        LOGGER.debug("Received {} Time [{}] UserId [{}] ElapsedT [{}ms]", new Object[]{beginRecord, Long.valueOf(time), Integer.valueOf(beginRecord.getUserId()), Double.valueOf((nanoTime3 - nanoTime2) / 1000000.0d)});
        if (IfmxStreamRecordType.COMMIT.equals(endRecord.getType())) {
            IfxCDCCommitTransactionRecord ifxCDCCommitTransactionRecord = endRecord;
            long sequenceId3 = ifxCDCCommitTransactionRecord.getSequenceId();
            long time2 = ifxCDCCommitTransactionRecord.getTime();
            if (!z) {
                updateChangePosition(informixOffsetContext, Long.valueOf(sequenceId3), null, Integer.valueOf(transactionId), null);
            }
            Map<String, IfmxReadableType> map = null;
            Map<String, TableId> tableIdByLabelId = informixCdcTransactionEngine.getTableIdByLabelId();
            Iterator<IfmxStreamRecord> it = informixStreamTransactionRecord.getRecords().iterator();
            while (it.hasNext()) {
                IfmxStreamOperationRecord ifmxStreamOperationRecord = (IfmxStreamRecord) it.next();
                long nanoTime4 = System.nanoTime();
                long sequenceId4 = ifmxStreamOperationRecord.getSequenceId();
                if (!z || Lsn.of(Long.valueOf(sequenceId4)).compareTo(informixOffsetContext.getChangePosition().getChangeLsn()) > 0) {
                    Optional ofNullable = Optional.ofNullable(ifmxStreamOperationRecord.getLabel());
                    Objects.requireNonNull(tableIdByLabelId);
                    Optional map2 = ofNullable.map((v1) -> {
                        return r1.get(v1);
                    });
                    updateChangePosition(informixOffsetContext, null, Long.valueOf(sequenceId4), Integer.valueOf(transactionId), null);
                    switch (AnonymousClass1.$SwitchMap$com$informix$stream$api$IfmxStreamRecordType[ifmxStreamOperationRecord.getType().ordinal()]) {
                        case 2:
                        case 3:
                        case 4:
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug(RECEIVED_GENERIC_RECORD, ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d));
                            break;
                        case 5:
                            Map<String, IfmxReadableType> data = ifmxStreamOperationRecord.getData();
                            handleOperation(informixPartition, informixOffsetContext, Envelope.Operation.CREATE, null, data, (TableId) map2.orElseThrow());
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug("Received {} ElapsedT [{}ms] Data After [{}]", new Object[]{ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d), data});
                            break;
                        case 6:
                            map = ifmxStreamOperationRecord.getData();
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}]", new Object[]{ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d), map});
                            break;
                        case 7:
                            Map<String, IfmxReadableType> data2 = ifmxStreamOperationRecord.getData();
                            handleOperation(informixPartition, informixOffsetContext, Envelope.Operation.UPDATE, map, data2, (TableId) map2.orElseThrow());
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}] Data After [{}]", new Object[]{ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d), map, data2});
                            break;
                        case 8:
                            map = ifmxStreamOperationRecord.getData();
                            handleOperation(informixPartition, informixOffsetContext, Envelope.Operation.DELETE, map, null, (TableId) map2.orElseThrow());
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}]", new Object[]{ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d), map});
                            break;
                        case 9:
                            Optional map3 = Optional.of(Integer.valueOf(((IfxCDCTruncateRecord) ifmxStreamOperationRecord).getUserId())).map((v0) -> {
                                return v0.toString();
                            });
                            Objects.requireNonNull(tableIdByLabelId);
                            handleOperation(informixPartition, informixOffsetContext, Envelope.Operation.TRUNCATE, null, null, (TableId) map3.map((v1) -> {
                                return r1.get(v1);
                            }).orElseThrow());
                            LOGGER.debug(RECEIVED_GENERIC_RECORD, ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d));
                            break;
                        default:
                            nanoTime3 = System.nanoTime();
                            LOGGER.debug(RECEIVED_UNKNOWN_RECORD_TYPE, ifmxStreamOperationRecord, Double.valueOf((nanoTime3 - nanoTime4) / 1000000.0d));
                            break;
                    }
                } else {
                    LOGGER.info("Skipping already processed record {}", Long.valueOf(sequenceId4));
                }
            }
            nanoTime2 = System.nanoTime();
            updateChangePosition(informixOffsetContext, null, Long.valueOf(sequenceId3), Integer.valueOf(transactionId), null);
            this.dispatcher.dispatchTransactionCommittedEvent(informixPartition, informixOffsetContext, Instant.ofEpochSecond(time2));
            long nanoTime5 = System.nanoTime();
            LOGGER.debug("Received {} Time [{}] UserId [{}] ElapsedT [{}ms]", new Object[]{endRecord, Long.valueOf(time2), Integer.valueOf(beginRecord.getUserId()), Double.valueOf((nanoTime5 - nanoTime2) / 1000000.0d)});
            LOGGER.debug("Handle Transaction Events [{}], ElapsedT [{}ms]", Integer.valueOf(informixStreamTransactionRecord.getRecords().size()), Double.valueOf((nanoTime5 - nanoTime) / 1000000.0d));
        }
        if (IfmxStreamRecordType.ROLLBACK.equals(endRecord.getType())) {
            if (!z) {
                updateChangePosition(informixOffsetContext, Long.valueOf(sequenceId2), Long.valueOf(sequenceId2), Integer.valueOf(transactionId), null);
                informixOffsetContext.getTransactionContext().endTransaction();
            }
            LOGGER.debug(RECEIVED_GENERIC_RECORD, endRecord, Double.valueOf((System.nanoTime() - nanoTime2) / 1000000.0d));
        }
    }

    private void updateChangePosition(InformixOffsetContext informixOffsetContext, Long l, Long l2, Integer num, Long l3) {
        informixOffsetContext.setChangePosition(TxLogPosition.cloneAndSet(informixOffsetContext.getChangePosition(), Lsn.of(l), Lsn.of(l2), num, Lsn.of(l3)));
    }

    private void handleOperation(InformixPartition informixPartition, InformixOffsetContext informixOffsetContext, Envelope.Operation operation, Map<String, IfmxReadableType> map, Map<String, IfmxReadableType> map2, TableId tableId) throws InterruptedException {
        informixOffsetContext.event(tableId, this.clock.currentTime());
        this.dispatcher.dispatchDataChangeEvent(informixPartition, tableId, new InformixChangeRecordEmitter(informixPartition, informixOffsetContext, operation, InformixChangeRecordEmitter.convertIfxData2Array(map, this.schema.schemaFor(tableId)), InformixChangeRecordEmitter.convertIfxData2Array(map2, this.schema.schemaFor(tableId)), this.clock, this.connectorConfig));
    }
}
