/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.kafka.KafkaTools;
import io.deephaven.util.annotations.ScriptApi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.function.IntPredicate;
import org.apache.avro.Schema;
import org.jetbrains.annotations.NotNull;

public class CdcTools {
    public static final String KEY_AVRO_SCHEMA_SUFFIX = "-key";
    public static final String VALUE_AVRO_SCHEMA_SUFFIX = "-value";
    public static final String CDC_TOPIC_NAME_SEPARATOR = ".";
    public static final String CDC_AFTER_COLUMN_PREFIX = "after__";
    public static final String CDC_OP_COLUMN_NAME = "op";
    public static final String CDC_DELETE_OP_VALUE = "d";

    @ScriptApi
    public static CdcSpec cdcLongSpec(String topic, String keySchemaName, String valueSchemaName) {
        return new CdcSpecTopicSchemas(topic, keySchemaName, "latest", valueSchemaName, "latest");
    }

    @ScriptApi
    public static CdcSpec cdcLongSpec(String topic, String keySchemaName, String keySchemaVersion, String valueSchemaName, String valueSchemaVersion) {
        return new CdcSpecTopicSchemas(topic, keySchemaName, keySchemaVersion, valueSchemaName, valueSchemaVersion);
    }

    @ScriptApi
    public static CdcSpec cdcShortSpec(String serverName, String dbName, String tableName) {
        return new CdcSpecServerDbTable(serverName, dbName, tableName);
    }

    @ScriptApi
    public static Table consumeToTable(@NotNull Properties kafkaProperties, @NotNull CdcSpec cdcSpec, @NotNull IntPredicate partitionFilter) {
        return CdcTools.consumeToTable(kafkaProperties, cdcSpec, partitionFilter, false, null);
    }

    @ScriptApi
    public static Table consumeToTable(@NotNull Properties kafkaProperties, @NotNull CdcSpec cdcSpec, @NotNull IntPredicate partitionFilter, boolean asBlinkTable, Collection<String> dropColumns) {
        Schema valueSchema = KafkaTools.getAvroSchema(kafkaProperties, cdcSpec.valueSchemaName(), cdcSpec.valueSchemaVersion());
        Schema keySchema = KafkaTools.getAvroSchema(kafkaProperties, cdcSpec.keySchemaName(), cdcSpec.keySchemaVersion());
        Table streamingIn = KafkaTools.consumeToTable(kafkaProperties, cdcSpec.topic(), partitionFilter, KafkaTools.ALL_PARTITIONS_SEEK_TO_BEGINNING, KafkaTools.Consume.avroSpec(keySchema), KafkaTools.Consume.avroSpec(valueSchema), KafkaTools.TableType.blink());
        List<String> dbTableColumnNames = CdcTools.dbTableColumnNames(streamingIn);
        ArrayList<String> allDroppedColumns = null;
        if (dropColumns != null && dropColumns.size() > 0) {
            allDroppedColumns = new ArrayList<String>(dropColumns);
        }
        if (!asBlinkTable) {
            if (allDroppedColumns == null) {
                allDroppedColumns = new ArrayList(1);
            }
            allDroppedColumns.add(CDC_OP_COLUMN_NAME);
        }
        List<String> dbTableKeyColumnNames = CdcTools.fieldNames(keySchema);
        Table narrowerStreamingTable = (Table)streamingIn.view(CdcTools.narrowerStreamingTableViewExpressions(dbTableKeyColumnNames, dbTableColumnNames));
        if (asBlinkTable) {
            if (allDroppedColumns != null) {
                return (Table)narrowerStreamingTable.dropColumns(allDroppedColumns);
            }
            return narrowerStreamingTable;
        }
        Table cdc = (Table)((Table)((Table)narrowerStreamingTable.lastBy(dbTableKeyColumnNames)).where(new String[]{"op != `d`"})).dropColumns(allDroppedColumns);
        return cdc;
    }

    @ScriptApi
    public static Table consumeRawToTable(@NotNull Properties kafkaProperties, @NotNull CdcSpec cdcSpec, @NotNull IntPredicate partitionFilter, @NotNull KafkaTools.TableType tableType) {
        return KafkaTools.consumeToTable(kafkaProperties, cdcSpec.topic(), partitionFilter, KafkaTools.ALL_PARTITIONS_SEEK_TO_BEGINNING, KafkaTools.Consume.avroSpec(cdcSpec.keySchemaName(), cdcSpec.keySchemaVersion()), KafkaTools.Consume.avroSpec(cdcSpec.valueSchemaName(), cdcSpec.valueSchemaVersion()), tableType);
    }

    private static String[] narrowerStreamingTableViewExpressions(List<String> dbTableKeyColumnNames, List<String> dbTableColumnNames) {
        String[] viewExpressions = new String[dbTableColumnNames.size() + 1];
        int i = 0;
        HashSet<String> keyColumnsSet = new HashSet<String>(dbTableKeyColumnNames);
        for (String columnName : dbTableColumnNames) {
            if (dbTableKeyColumnNames.contains(columnName)) {
                viewExpressions[i++] = columnName;
                continue;
            }
            viewExpressions[i++] = columnName + "=after__" + columnName;
        }
        viewExpressions[i++] = CDC_OP_COLUMN_NAME;
        return viewExpressions;
    }

    private static List<String> dbTableColumnNames(Table streamingIn) {
        ArrayList<String> columnNames = new ArrayList<String>();
        int nameOffset = CDC_AFTER_COLUMN_PREFIX.length();
        for (ColumnDefinition col : streamingIn.getDefinition().getColumns()) {
            String name = col.getName();
            if (!name.startsWith(CDC_AFTER_COLUMN_PREFIX)) continue;
            columnNames.add(name.substring(nameOffset));
        }
        return columnNames;
    }

    private static List<String> fieldNames(Schema schema) {
        ArrayList<String> fieldNames = new ArrayList<String>();
        for (Schema.Field field : schema.getFields()) {
            fieldNames.add(field.name());
        }
        return fieldNames;
    }

    @ScriptApi
    public static String cdcTopicName(String serverName, String dbName, String tableName) {
        return serverName + CDC_TOPIC_NAME_SEPARATOR + dbName + CDC_TOPIC_NAME_SEPARATOR + tableName;
    }

    @ScriptApi
    public static String cdcKeyAvroSchemaName(String serverName, String dbName, String tableName) {
        return CdcTools.cdcTopicName(serverName, dbName, tableName) + KEY_AVRO_SCHEMA_SUFFIX;
    }

    @ScriptApi
    public static String cdcValueAvroSchemaName(String serverName, String dbName, String tableName) {
        return CdcTools.cdcTopicName(serverName, dbName, tableName) + VALUE_AVRO_SCHEMA_SUFFIX;
    }

    private static class CdcSpecServerDbTable
    implements CdcSpec {
        public final String serverName;
        public final String dbName;
        public final String tableName;

        private CdcSpecServerDbTable(String serverName, String dbName, String tableName) {
            this.serverName = serverName;
            this.dbName = dbName;
            this.tableName = tableName;
        }

        @Override
        public String topic() {
            return CdcTools.cdcTopicName(this.serverName, this.dbName, this.tableName);
        }

        @Override
        public String keySchemaName() {
            return CdcTools.cdcKeyAvroSchemaName(this.serverName, this.dbName, this.tableName);
        }

        @Override
        public String keySchemaVersion() {
            return "latest";
        }

        @Override
        public String valueSchemaName() {
            return CdcTools.cdcValueAvroSchemaName(this.serverName, this.dbName, this.tableName);
        }

        @Override
        public String valueSchemaVersion() {
            return "latest";
        }
    }

    private static class CdcSpecTopicSchemas
    implements CdcSpec {
        public final String topic;
        public final String keySchemaName;
        public final String keySchemaVersion;
        public final String valueSchemaName;
        public final String valueSchemaVersion;

        private CdcSpecTopicSchemas(String topic, String keySchemaName, String keySchemaVersion, String valueSchemaName, String valueSchemaVersion) {
            this.topic = topic;
            this.keySchemaName = keySchemaName;
            this.keySchemaVersion = keySchemaVersion;
            this.valueSchemaName = valueSchemaName;
            this.valueSchemaVersion = valueSchemaVersion;
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public String keySchemaName() {
            return this.keySchemaName;
        }

        @Override
        public String keySchemaVersion() {
            return this.keySchemaVersion;
        }

        @Override
        public String valueSchemaName() {
            return this.valueSchemaName;
        }

        @Override
        public String valueSchemaVersion() {
            return this.valueSchemaVersion;
        }
    }

    public static interface CdcSpec {
        public String topic();

        public String keySchemaName();

        public String keySchemaVersion();

        public String valueSchemaName();

        public String valueSchemaVersion();
    }
}

