package io.debezium.connector.cassandra;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CellData;
import io.debezium.connector.cassandra.SchemaHolder;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogReadHandlerImpl.class */
public class CommitLogReadHandlerImpl implements CommitLogReadHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogReadHandlerImpl.class);
    private static final boolean MARK_OFFSET = true;
    private final ChangeEventQueue<Event> queue;
    private final RecordMaker recordMaker;
    private final OffsetWriter offsetWriter;
    private final SchemaHolder schemaHolder;
    private final CommitLogProcessorMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/debezium/connector/cassandra/CommitLogReadHandlerImpl$PartitionType.class */
    public enum PartitionType {
        PARTITION_KEY_ROW_DELETION,
        PARTITION_AND_CLUSTERING_KEY_ROW_DELETION,
        ROW_LEVEL_MODIFICATION,
        MATERIALIZED_VIEW,
        SECONDARY_INDEX,
        COUNTER;

        static final Set<PartitionType> supportedPartitionTypes = new HashSet(Arrays.asList(PARTITION_KEY_ROW_DELETION, ROW_LEVEL_MODIFICATION));

        public static PartitionType getPartitionType(PartitionUpdate partitionUpdate) {
            return partitionUpdate.metadata().isCounter() ? COUNTER : partitionUpdate.metadata().isView() ? MATERIALIZED_VIEW : partitionUpdate.metadata().isIndex() ? SECONDARY_INDEX : (isPartitionDeletion(partitionUpdate) && hasClusteringKeys(partitionUpdate)) ? PARTITION_AND_CLUSTERING_KEY_ROW_DELETION : (!isPartitionDeletion(partitionUpdate) || hasClusteringKeys(partitionUpdate)) ? ROW_LEVEL_MODIFICATION : PARTITION_KEY_ROW_DELETION;
        }

        public static boolean isValid(PartitionType partitionType) {
            return supportedPartitionTypes.contains(partitionType);
        }

        public static boolean hasClusteringKeys(PartitionUpdate partitionUpdate) {
            return !partitionUpdate.metadata().clusteringColumns().isEmpty();
        }

        public static boolean isPartitionDeletion(PartitionUpdate partitionUpdate) {
            return partitionUpdate.partitionLevelDeletion().markedForDeleteAt() > Long.MIN_VALUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/debezium/connector/cassandra/CommitLogReadHandlerImpl$RowType.class */
    public enum RowType {
        INSERT,
        UPDATE,
        DELETE,
        RANGE_TOMBSTONE,
        UNKNOWN;

        static final Set<RowType> supportedRowTypes = new HashSet(Arrays.asList(INSERT, UPDATE, DELETE));

        public static RowType getRowType(Unfiltered unfiltered) {
            if (unfiltered.isRangeTombstoneMarker()) {
                return RANGE_TOMBSTONE;
            }
            if (unfiltered.isRow()) {
                Row row = (Row) unfiltered;
                if (isDelete(row)) {
                    return DELETE;
                }
                if (isInsert(row)) {
                    return INSERT;
                }
                if (isUpdate(row)) {
                    return UPDATE;
                }
            }
            return UNKNOWN;
        }

        public static boolean isValid(RowType rowType) {
            return supportedRowTypes.contains(rowType);
        }

        public static boolean isDelete(Row row) {
            return row.deletion().time().markedForDeleteAt() > Long.MIN_VALUE;
        }

        public static boolean isInsert(Row row) {
            return row.primaryKeyLivenessInfo().timestamp() > Long.MIN_VALUE;
        }

        public static boolean isUpdate(Row row) {
            return row.primaryKeyLivenessInfo().timestamp() == Long.MIN_VALUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommitLogReadHandlerImpl(SchemaHolder schemaHolder, ChangeEventQueue<Event> changeEventQueue, OffsetWriter offsetWriter, RecordMaker recordMaker, CommitLogProcessorMetrics commitLogProcessorMetrics) {
        this.queue = changeEventQueue;
        this.offsetWriter = offsetWriter;
        this.recordMaker = recordMaker;
        this.schemaHolder = schemaHolder;
        this.metrics = commitLogProcessorMetrics;
    }

    public void handleMutation(Mutation mutation, int i, int i2, CommitLogDescriptor commitLogDescriptor) {
        if (mutation.trackedByCDC()) {
            this.metrics.setCommitLogPosition(i2);
            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) {
                OffsetPosition offsetPosition = new OffsetPosition(commitLogDescriptor.fileName(), i2);
                KeyspaceTable keyspaceTable = new KeyspaceTable(mutation.getKeyspaceName(), partitionUpdate.metadata().cfName);
                if (this.offsetWriter.isOffsetProcessed(keyspaceTable.name(), offsetPosition.serialize(), false)) {
                    LOGGER.debug("Mutation at {} for table {} already processed, skipping...", offsetPosition, keyspaceTable);
                    return;
                } else {
                    try {
                        process(partitionUpdate, offsetPosition, keyspaceTable);
                    } catch (Exception e) {
                        throw new DebeziumException(String.format("Failed to process PartitionUpdate %s at %s for table %s.", partitionUpdate.toString(), offsetPosition.toString(), keyspaceTable.name()), e);
                    }
                }
            }
            this.metrics.onSuccess();
        }
    }

    public void handleUnrecoverableError(CommitLogReadHandler.CommitLogReadException commitLogReadException) throws IOException {
        LOGGER.error("Unrecoverable error when reading commit log", commitLogReadException);
        this.metrics.onUnrecoverableError();
    }

    public boolean shouldSkipSegmentOnError(CommitLogReadHandler.CommitLogReadException commitLogReadException) throws IOException {
        if (commitLogReadException.permissible) {
            LOGGER.error("Encountered a permissible exception during log replay", commitLogReadException);
            return false;
        }
        LOGGER.error("Encountered a non-permissible exception during log replay", commitLogReadException);
        return false;
    }

    private void process(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
        PartitionType partitionType = PartitionType.getPartitionType(partitionUpdate);
        if (!PartitionType.isValid(partitionType)) {
            LOGGER.warn("Encountered an unsupported partition type {}, skipping...", partitionType);
            return;
        }
        switch (partitionType) {
            case PARTITION_KEY_ROW_DELETION:
                handlePartitionDeletion(partitionUpdate, offsetPosition, keyspaceTable);
                return;
            case ROW_LEVEL_MODIFICATION:
                UnfilteredRowIterator unfilteredIterator = partitionUpdate.unfilteredIterator();
                while (unfilteredIterator.hasNext()) {
                    Unfiltered unfiltered = (Unfiltered) unfilteredIterator.next();
                    RowType rowType = RowType.getRowType(unfiltered);
                    if (RowType.isValid(rowType)) {
                        handleRowModifications((Row) unfiltered, rowType, partitionUpdate, offsetPosition, keyspaceTable);
                    } else {
                        LOGGER.warn("Encountered an unsupported row type {}, skipping...", rowType);
                    }
                }
                return;
            default:
                throw new CassandraConnectorSchemaException("Unsupported partition type " + partitionType + " should have been skipped");
        }
    }

    private void handlePartitionDeletion(PartitionUpdate partitionUpdate, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
        try {
            SchemaHolder.KeyValueSchema orUpdateKeyValueSchema = this.schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
            Schema keySchema = orUpdateKeyValueSchema.keySchema();
            Schema valueSchema = orUpdateKeyValueSchema.valueSchema();
            RowData rowData = new RowData();
            populatePartitionColumns(rowData, partitionUpdate);
            TableMetadata tableMetadata = orUpdateKeyValueSchema.tableMetadata();
            if (!tableMetadata.getClusteringColumns().isEmpty()) {
                throw new CassandraConnectorSchemaException("Uh-oh... clustering key should not exist for partition deletion");
            }
            List columns = tableMetadata.getColumns();
            columns.removeAll(tableMetadata.getPartitionKey());
            Iterator it = columns.iterator();
            while (it.hasNext()) {
                rowData.addCell(new CellData(((ColumnMetadata) it.next()).getName(), null, Long.valueOf(partitionUpdate.deletionInfo().getPartitionDeletion().markedForDeleteAt()), CellData.ColumnType.REGULAR));
            }
            RecordMaker recordMaker = this.recordMaker;
            String clusterName = DatabaseDescriptor.getClusterName();
            Instant instantFromMicros = Conversions.toInstantFromMicros(partitionUpdate.maxTimestamp());
            ChangeEventQueue<Event> changeEventQueue = this.queue;
            changeEventQueue.getClass();
            recordMaker.delete(clusterName, offsetPosition, keyspaceTable, false, instantFromMicros, rowData, keySchema, valueSchema, true, (v1) -> {
                r10.enqueue(v1);
            });
        } catch (Exception e) {
            LOGGER.error("Fail to delete partition at {}. Reason: {}", offsetPosition, e);
        }
    }

    private void handleRowModifications(Row row, RowType rowType, PartitionUpdate partitionUpdate, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
        SchemaHolder.KeyValueSchema orUpdateKeyValueSchema = this.schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
        Schema keySchema = orUpdateKeyValueSchema.keySchema();
        Schema valueSchema = orUpdateKeyValueSchema.valueSchema();
        RowData rowData = new RowData();
        populatePartitionColumns(rowData, partitionUpdate);
        populateClusteringColumns(rowData, row, partitionUpdate);
        populateRegularColumns(rowData, row, rowType, orUpdateKeyValueSchema);
        long markedForDeleteAt = rowType == RowType.DELETE ? row.deletion().time().markedForDeleteAt() : partitionUpdate.maxTimestamp();
        switch (rowType) {
            case INSERT:
                RecordMaker recordMaker = this.recordMaker;
                String clusterName = DatabaseDescriptor.getClusterName();
                Instant instantFromMicros = Conversions.toInstantFromMicros(markedForDeleteAt);
                ChangeEventQueue<Event> changeEventQueue = this.queue;
                changeEventQueue.getClass();
                recordMaker.insert(clusterName, offsetPosition, keyspaceTable, false, instantFromMicros, rowData, keySchema, valueSchema, true, (v1) -> {
                    r10.enqueue(v1);
                });
                return;
            case UPDATE:
                RecordMaker recordMaker2 = this.recordMaker;
                String clusterName2 = DatabaseDescriptor.getClusterName();
                Instant instantFromMicros2 = Conversions.toInstantFromMicros(markedForDeleteAt);
                ChangeEventQueue<Event> changeEventQueue2 = this.queue;
                changeEventQueue2.getClass();
                recordMaker2.update(clusterName2, offsetPosition, keyspaceTable, false, instantFromMicros2, rowData, keySchema, valueSchema, true, (v1) -> {
                    r10.enqueue(v1);
                });
                return;
            case DELETE:
                RecordMaker recordMaker3 = this.recordMaker;
                String clusterName3 = DatabaseDescriptor.getClusterName();
                Instant instantFromMicros3 = Conversions.toInstantFromMicros(markedForDeleteAt);
                ChangeEventQueue<Event> changeEventQueue3 = this.queue;
                changeEventQueue3.getClass();
                recordMaker3.delete(clusterName3, offsetPosition, keyspaceTable, false, instantFromMicros3, rowData, keySchema, valueSchema, true, (v1) -> {
                    r10.enqueue(v1);
                });
                return;
            default:
                throw new CassandraConnectorTaskException("Unsupported row type " + rowType + " should have been skipped");
        }
    }

    private void populatePartitionColumns(RowData rowData, PartitionUpdate partitionUpdate) {
        List<Object> partitionKeys = getPartitionKeys(partitionUpdate);
        for (ColumnDefinition columnDefinition : partitionUpdate.metadata().partitionKeyColumns()) {
            try {
                rowData.addCell(new CellData(columnDefinition.name.toString(), partitionKeys.get(columnDefinition.position()), null, CellData.ColumnType.PARTITION));
            } catch (Exception e) {
                throw new DebeziumException(String.format("Failed to populate Column %s with Type %s of Table %s in KeySpace %s.", columnDefinition.name.toString(), columnDefinition.type.toString(), columnDefinition.cfName, columnDefinition.ksName), e);
            }
        }
    }

    private void populateClusteringColumns(RowData rowData, Row row, PartitionUpdate partitionUpdate) {
        for (ColumnDefinition columnDefinition : partitionUpdate.metadata().clusteringColumns()) {
            try {
                rowData.addCell(new CellData(columnDefinition.name.toString(), CassandraTypeDeserializer.deserialize((AbstractType<?>) columnDefinition.type, row.clustering().get(columnDefinition.position())), null, CellData.ColumnType.CLUSTERING));
            } catch (Exception e) {
                throw new DebeziumException(String.format("Failed to populate Column %s with Type %s of Table %s in KeySpace %s.", columnDefinition.name.toString(), columnDefinition.type.toString(), columnDefinition.cfName, columnDefinition.ksName), e);
            }
        }
    }

    private void populateRegularColumns(RowData rowData, Row row, RowType rowType, SchemaHolder.KeyValueSchema keyValueSchema) {
        Object deserialize;
        if (rowType != RowType.INSERT && rowType != RowType.UPDATE) {
            if (rowType == RowType.DELETE) {
                TableMetadata tableMetadata = keyValueSchema.tableMetadata();
                List columns = tableMetadata.getColumns();
                columns.removeAll(tableMetadata.getPrimaryKey());
                Iterator it = columns.iterator();
                while (it.hasNext()) {
                    rowData.addCell(new CellData(((ColumnMetadata) it.next()).getName(), null, Long.valueOf(row.deletion().time().markedForDeleteAt()), CellData.ColumnType.REGULAR));
                }
                return;
            }
            return;
        }
        for (ColumnDefinition columnDefinition : row.columns()) {
            try {
                Long l = null;
                CollectionType collectionType = columnDefinition.type;
                if (collectionType.isCollection() && collectionType.isMultiCell()) {
                    deserialize = CassandraTypeDeserializer.deserialize((CollectionType<?>) collectionType, row.getComplexColumnData(columnDefinition));
                } else {
                    Cell cell = row.getCell(columnDefinition);
                    deserialize = cell.isTombstone() ? null : CassandraTypeDeserializer.deserialize((AbstractType<?>) collectionType, cell.value());
                    l = cell.isExpiring() ? Long.valueOf(TimeUnit.MICROSECONDS.convert(cell.localDeletionTime(), TimeUnit.SECONDS)) : null;
                }
                rowData.addCell(new CellData(columnDefinition.name.toString(), deserialize, l, CellData.ColumnType.REGULAR));
            } catch (Exception e) {
                throw new DebeziumException(String.format("Failed to populate Column %s with Type %s of Table %s in KeySpace %s.", columnDefinition.name.toString(), columnDefinition.type.toString(), columnDefinition.cfName, columnDefinition.ksName), e);
            }
        }
    }

    private static List<Object> getPartitionKeys(PartitionUpdate partitionUpdate) {
        ArrayList arrayList = new ArrayList();
        List partitionKeyColumns = partitionUpdate.metadata().partitionKeyColumns();
        if (partitionKeyColumns.size() == 1) {
            ByteBuffer key = partitionUpdate.partitionKey().getKey();
            ColumnSpecification columnSpecification = (ColumnSpecification) partitionKeyColumns.get(0);
            try {
                arrayList.add(CassandraTypeDeserializer.deserialize((AbstractType<?>) columnSpecification.type, key));
            } catch (Exception e) {
                throw new DebeziumException(String.format("Failed to deserialize Column %s with Type %s in Table %s and KeySpace %s.", columnSpecification.name.toString(), columnSpecification.type.toString(), columnSpecification.cfName, columnSpecification.ksName), e);
            }
        } else {
            ByteBuffer duplicate = partitionUpdate.partitionKey().getKey().duplicate();
            if (duplicate.remaining() >= 2 && (ByteBufferUtil.getShortLength(duplicate, duplicate.position()) & 65535) == 65535) {
                ByteBufferUtil.readShortLength(duplicate);
            }
            for (int i = 0; duplicate.remaining() > 0 && i < partitionKeyColumns.size(); i++) {
                ColumnSpecification columnSpecification2 = (ColumnSpecification) partitionKeyColumns.get(i);
                try {
                    arrayList.add(CassandraTypeDeserializer.deserialize((AbstractType<?>) columnSpecification2.type, ByteBufferUtil.readBytesWithShortLength(duplicate)));
                    if (duplicate.get() != 0) {
                        break;
                    }
                } catch (Exception e2) {
                    throw new DebeziumException(String.format("Failed to deserialize Column %s with Type %s in Table %s and KeySpace %s", columnSpecification2.name.toString(), columnSpecification2.type.toString(), columnSpecification2.cfName, columnSpecification2.ksName), e2);
                }
            }
        }
        return arrayList;
    }
}
