/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms;

import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.SchemaUtil;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.transforms.ConnectRecordUtil;
import io.debezium.transforms.SmtManager;
import io.debezium.util.BoundedConcurrentHashMap;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExtractSchemaToNewRecord<R extends ConnectRecord<R>>
implements Transformation<R>,
Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExtractSchemaToNewRecord.class);
    public static final String SOURCE_SCHEMA_KEY = "sourceSchema";
    private final ExtractField<R> afterDelegate = ConnectRecordUtil.extractAfterDelegate();
    private final ExtractField<R> beforeDelegate = ConnectRecordUtil.extractBeforeDelegate();
    private final BoundedConcurrentHashMap<Schema, NewRecordValueMetadata> recordValueSchemaCache = new BoundedConcurrentHashMap(10240);
    private final Field.Set configFields = Field.setOf(new Field[0]);
    private SchemaNameAdjuster schemaNameAdjuster;
    private SmtManager<R> smtManager;

    @Override
    public R apply(R record) {
        if (((ConnectRecord)record).value() == null) {
            return record;
        }
        if (!this.smtManager.isValidEnvelope(record)) {
            return record;
        }
        R afterRecord = this.afterDelegate.apply(record);
        R beforeRecord = this.beforeDelegate.apply(record);
        Struct oldValue = (Struct)((ConnectRecord)record).value();
        if (((ConnectRecord)afterRecord).value() == null && ((ConnectRecord)beforeRecord).value() == null) {
            return record;
        }
        NewRecordValueMetadata newRecordValueMetadata = this.recordValueSchemaCache.computeIfAbsent(((ConnectRecord)record).valueSchema(), key -> this.makeUpdatedSchema((Schema)key, oldValue, afterRecord));
        Struct newValue = new Struct(newRecordValueMetadata.schema);
        for (org.apache.kafka.connect.data.Field field : ((ConnectRecord)record).valueSchema().fields()) {
            Object value = oldValue.get(field);
            if (value == null) continue;
            newValue.put(field, value);
        }
        org.apache.kafka.connect.data.Field sourceSchemaField = newRecordValueMetadata.schema.field(SOURCE_SCHEMA_KEY);
        newValue.put(sourceSchemaField, (Object)newRecordValueMetadata.metadataValue);
        return ((ConnectRecord)record).newRecord(((ConnectRecord)record).topic(), ((ConnectRecord)record).kafkaPartition(), ((ConnectRecord)record).keySchema(), ((ConnectRecord)record).key(), newRecordValueMetadata.schema, newValue, ((ConnectRecord)record).timestamp(), ((ConnectRecord)record).headers());
    }

    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        Field.group(config, null, this.configFields.asArray());
        return config;
    }

    @Override
    public void close() {
        this.afterDelegate.close();
        this.beforeDelegate.close();
    }

    @Override
    public void configure(Map<String, ?> configs) {
        Configuration config = Configuration.from(configs);
        this.smtManager = new SmtManager(config);
        if (!config.validateAndRecord(this.validateConfigFields(), LOGGER::error)) {
            throw new DebeziumException("Unable to validate config.");
        }
        this.schemaNameAdjuster = CommonConnectorConfig.SchemaNameAdjustmentMode.parse(config.getString(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE)).createAdjuster();
    }

    @Override
    public String version() {
        return Module.version();
    }

    private Iterable<Field> validateConfigFields() {
        return this.configFields;
    }

    private NewRecordValueMetadata makeUpdatedSchema(Schema originalSchema, Struct originalValue, R afterRecord) {
        SchemaBuilder builder = SchemaUtil.copySchema(originalSchema);
        Schema sourceSchemaBlockSchema = SchemaFactory.get().sourceSchemaBlockSchema(this.schemaNameAdjuster);
        builder.field(SOURCE_SCHEMA_KEY, sourceSchemaBlockSchema);
        Schema newValueSchema = builder.build();
        Struct sourceSchemaStruct = new Struct(sourceSchemaBlockSchema);
        Struct sourceStruct = (Struct)originalValue.get("source");
        sourceSchemaStruct.put("id", sourceStruct.get("table"));
        Struct table = new Struct(SchemaFactory.get().sourceSchemaBlockTableSchema(this.schemaNameAdjuster));
        ArrayList columns = new ArrayList();
        ((ConnectRecord)afterRecord).valueSchema().fields().forEach(field -> {
            Struct column = new Struct(SchemaFactory.get().sourceSchemaBlockColumnSchema(this.schemaNameAdjuster));
            Optional<String> nameOpt = SchemaUtil.getSourceColumnName(field.schema());
            Optional<String> typeNameOpt = SchemaUtil.getSourceColumnType(field.schema());
            if (nameOpt.isEmpty() || typeNameOpt.isEmpty()) {
                throw new DebeziumException("Ensure that enable configurations \"column.propagate.source.type\" or \"datatype.propagate.source.type\" and the value is set to \".*\"");
            }
            nameOpt.ifPresent(s2 -> column.put("name", s2));
            typeNameOpt.ifPresent(s2 -> column.put("typeName", s2));
            SchemaUtil.getSourceColumnSize(field.schema()).ifPresent(s2 -> column.put("length", (Object)Integer.parseInt(s2)));
            SchemaUtil.getSourceColumnPrecision(field.schema()).ifPresent(s2 -> column.put("scale", (Object)Integer.parseInt(s2)));
            SchemaUtil.getSourceColumnComment(field.schema()).ifPresent(s2 -> column.put("comment", s2));
            columns.add(column);
        });
        table.put("columns", columns);
        sourceSchemaStruct.put("table", (Object)table);
        return new NewRecordValueMetadata(newValueSchema, sourceSchemaStruct);
    }

    private static class NewRecordValueMetadata {
        private final Schema schema;
        private final Struct metadataValue;

        NewRecordValueMetadata(Schema schema, Struct metadataValue) {
            this.schema = schema;
            this.metadataValue = metadataValue;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            NewRecordValueMetadata metadata = (NewRecordValueMetadata)o;
            return Objects.equals(this.schema, metadata.schema) && Objects.equals(this.metadataValue, metadata.metadataValue);
        }

        public int hashCode() {
            return Objects.hash(this.schema, this.metadataValue);
        }

        public String toString() {
            return "NewRecordValueMetadata{" + this.schema + ":" + this.metadataValue + "}";
        }
    }
}

