package io.debezium.pipeline.txmetadata;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.SchemaNameAdjuster;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/pipeline/txmetadata/TransactionMonitor.class */
public class TransactionMonitor {
    private static final String TOPIC_SUFFIX = ".transaction";
    public static final String DEBEZIUM_TRANSACTION_KEY = "transaction";
    public static final String DEBEZIUM_TRANSACTION_ID_KEY = "id";
    private final EventMetadataProvider eventMetadataProvider;
    private final String topicName;
    private final BlockingConsumer<SourceRecord> sender;
    private final CommonConnectorConfig connectorConfig;
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
    public static final String DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY = "total_order";
    public static final String DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY = "data_collection_order";
    public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaBuilder.struct().optional().field("id", Schema.STRING_SCHEMA).field(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA).field(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA).build();
    private static final Schema TRANSACTION_KEY_SCHEMA = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataKey")).field("id", Schema.STRING_SCHEMA).build();
    public static final String DEBEZIUM_TRANSACTION_COLLECTION_KEY = "data_collection";
    public static final String DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY = "event_count";
    private static final Schema EVENT_COUNT_PER_DATA_COLLECTION_SCHEMA = SchemaBuilder.struct().field(DEBEZIUM_TRANSACTION_COLLECTION_KEY, Schema.STRING_SCHEMA).field(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Schema.INT64_SCHEMA).build();
    public static final String DEBEZIUM_TRANSACTION_STATUS_KEY = "status";
    public static final String DEBEZIUM_TRANSACTION_DATA_COLLECTIONS_KEY = "data_collections";
    private static final Schema TRANSACTION_VALUE_SCHEMA = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.common.TransactionMetadataValue")).field(DEBEZIUM_TRANSACTION_STATUS_KEY, Schema.STRING_SCHEMA).field("id", Schema.STRING_SCHEMA).field(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Schema.OPTIONAL_INT64_SCHEMA).field(DEBEZIUM_TRANSACTION_DATA_COLLECTIONS_KEY, SchemaBuilder.array(EVENT_COUNT_PER_DATA_COLLECTION_SCHEMA).optional().build()).build();

    public TransactionMonitor(CommonConnectorConfig commonConnectorConfig, EventMetadataProvider eventMetadataProvider, BlockingConsumer<SourceRecord> blockingConsumer) {
        Objects.requireNonNull(eventMetadataProvider);
        this.topicName = commonConnectorConfig.getLogicalName() + TOPIC_SUFFIX;
        this.eventMetadataProvider = eventMetadataProvider;
        this.sender = blockingConsumer;
        this.connectorConfig = commonConnectorConfig;
    }

    public void dataEvent(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            TransactionContext transactionContext = offsetContext.getTransactionContext();
            String transactionId = this.eventMetadataProvider.getTransactionId(dataCollectionId, offsetContext, obj, struct);
            if (transactionId == null) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Event '{}' has no transaction id", this.eventMetadataProvider.toSummaryString(dataCollectionId, offsetContext, obj, struct));
                    return;
                }
                return;
            }
            if (!transactionContext.isTransactionInProgress()) {
                transactionContext.beginTransaction(transactionId);
                beginTransaction(offsetContext);
            } else if (!transactionContext.getTransactionId().equals(transactionId)) {
                endTransaction(offsetContext);
                transactionContext.endTransaction();
                transactionContext.beginTransaction(transactionId);
                beginTransaction(offsetContext);
            }
            transactionEvent(offsetContext, dataCollectionId, struct);
        }
    }

    public void transactionComittedEvent(OffsetContext offsetContext) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            endTransaction(offsetContext);
            offsetContext.getTransactionContext().endTransaction();
        }
    }

    public void transactionStartedEvent(String str, OffsetContext offsetContext) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            offsetContext.getTransactionContext().beginTransaction(str);
            beginTransaction(offsetContext);
        }
    }

    private void transactionEvent(OffsetContext offsetContext, DataCollectionId dataCollectionId, Struct struct) {
        long event = offsetContext.getTransactionContext().event(dataCollectionId);
        if (struct == null) {
            LOGGER.debug("Event with key {} without value. Cannot enrich source block.");
            return;
        }
        Struct struct2 = new Struct(TRANSACTION_BLOCK_SCHEMA);
        struct2.put("id", offsetContext.getTransactionContext().getTransactionId());
        struct2.put(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Long.valueOf(offsetContext.getTransactionContext().getTotalEventCount()));
        struct2.put(DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Long.valueOf(event));
        struct.put("transaction", struct2);
    }

    private void beginTransaction(OffsetContext offsetContext) throws InterruptedException {
        Struct struct = new Struct(TRANSACTION_KEY_SCHEMA);
        struct.put("id", offsetContext.getTransactionContext().getTransactionId());
        Struct struct2 = new Struct(TRANSACTION_VALUE_SCHEMA);
        struct2.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.BEGIN.name());
        struct2.put("id", offsetContext.getTransactionContext().getTransactionId());
        this.sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), this.topicName, null, struct.schema(), struct, struct2.schema(), struct2));
    }

    private void endTransaction(OffsetContext offsetContext) throws InterruptedException {
        Struct struct = new Struct(TRANSACTION_KEY_SCHEMA);
        struct.put("id", offsetContext.getTransactionContext().getTransactionId());
        Struct struct2 = new Struct(TRANSACTION_VALUE_SCHEMA);
        struct2.put(DEBEZIUM_TRANSACTION_STATUS_KEY, TransactionStatus.END.name());
        struct2.put("id", offsetContext.getTransactionContext().getTransactionId());
        struct2.put(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, Long.valueOf(offsetContext.getTransactionContext().getTotalEventCount()));
        Set<Map.Entry<String, Long>> entrySet = offsetContext.getTransactionContext().getPerTableEventCount().entrySet();
        ArrayList arrayList = new ArrayList(entrySet.size());
        for (Map.Entry<String, Long> entry : entrySet) {
            Struct struct3 = new Struct(EVENT_COUNT_PER_DATA_COLLECTION_SCHEMA);
            struct3.put(DEBEZIUM_TRANSACTION_COLLECTION_KEY, entry.getKey());
            struct3.put(DEBEZIUM_TRANSACTION_EVENT_COUNT_KEY, entry.getValue());
            arrayList.add(struct3);
        }
        struct2.put(DEBEZIUM_TRANSACTION_DATA_COLLECTIONS_KEY, arrayList);
        this.sender.accept(new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), this.topicName, null, struct.schema(), struct, struct2.schema(), struct2));
    }
}
