/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineHeader;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Binary;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.format.SimpleString;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;

public class ConverterBuilder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String HEADER_CONVERTER_PREFIX = "header.converter";
    private static final String KEY_CONVERTER_PREFIX = "key.converter";
    private static final String VALUE_CONVERTER_PREFIX = "value.converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
    private Class<? extends SerializationFormat<?>> formatHeader;
    private Class<? extends SerializationFormat<?>> formatKey;
    private Class<? extends SerializationFormat<?>> formatValue;
    private Configuration config;

    public ConverterBuilder<R> using(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
        this.formatKey = format.getKeyFormat();
        this.formatValue = format.getValueFormat();
        this.formatHeader = format.getHeaderFormat();
        return this;
    }

    public ConverterBuilder<R> using(Properties config) {
        this.config = Configuration.from(config);
        return this;
    }

    public HeaderConverter headerConverter() {
        if (this.formatValue == Connect.class) {
            return null;
        }
        if (this.formatHeader == null) {
            return null;
        }
        return this.createHeaderConverter(this.formatHeader);
    }

    public Function<SourceRecord, R> toFormat(HeaderConverter headerConverter) {
        Function<SourceRecord, Object> toFormat;
        if (this.formatValue == Connect.class) {
            toFormat = record -> new EmbeddedEngineChangeEvent(null, (SourceRecord)record, StreamSupport.stream(record.headers().spliterator(), false).map(EmbeddedEngineHeader::new).collect(Collectors.toList()), (SourceRecord)record);
        } else {
            Converter keyConverter = this.createConverter(this.formatKey, true);
            Converter valueConverter = this.createConverter(this.formatValue, false);
            toFormat = record -> {
                String topicName = record.topic();
                if (topicName == null) {
                    topicName = TOPIC_NAME;
                }
                byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key());
                byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value());
                List<Object> headers = Collections.emptyList();
                if (headerConverter != null) {
                    List<io.debezium.engine.Header<byte[]>> byteArrayHeaders = this.convertHeaders((SourceRecord)record, topicName, headerConverter);
                    headers = byteArrayHeaders;
                    if (this.shouldConvertHeadersToString()) {
                        headers = byteArrayHeaders.stream().map(h2 -> new EmbeddedEngineHeader<String>(h2.getKey(), new String((byte[])h2.getValue(), StandardCharsets.UTF_8))).collect(Collectors.toList());
                    }
                }
                Object convertedKey = key;
                Object convertedValue = value;
                if (key != null && this.shouldConvertKeyToString()) {
                    convertedKey = new String(key, StandardCharsets.UTF_8);
                }
                if (value != null && this.shouldConvertValueToString()) {
                    convertedValue = new String(value, StandardCharsets.UTF_8);
                }
                return new EmbeddedEngineChangeEvent((byte[])convertedKey, (byte[])convertedValue, headers, (SourceRecord)record);
            };
        }
        return toFormat;
    }

    public Function<R, SourceRecord> fromFormat() {
        return record -> ((EmbeddedEngineChangeEvent)record).sourceRecord();
    }

    private static boolean isFormat(Class<? extends SerializationFormat<?>> format1, Class<? extends SerializationFormat<?>> format2) {
        return format1 == format2;
    }

    private boolean shouldConvertKeyToString() {
        return ConverterBuilder.isFormat(this.formatKey, Json.class) || ConverterBuilder.isFormat(this.formatKey, SimpleString.class);
    }

    private boolean shouldConvertValueToString() {
        return ConverterBuilder.isFormat(this.formatValue, Json.class) || ConverterBuilder.isFormat(this.formatValue, SimpleString.class) || ConverterBuilder.isFormat(this.formatValue, CloudEvents.class);
    }

    private boolean shouldConvertHeadersToString() {
        return ConverterBuilder.isFormat(this.formatHeader, Json.class);
    }

    private List<io.debezium.engine.Header<byte[]>> convertHeaders(SourceRecord record, String topicName, HeaderConverter headerConverter) {
        ArrayList<io.debezium.engine.Header<byte[]>> headers = new ArrayList<io.debezium.engine.Header<byte[]>>();
        for (Header header : record.headers()) {
            String headerKey = header.key();
            byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value());
            headers.add(new EmbeddedEngineHeader<byte[]>(headerKey, rawHeader));
        }
        return headers;
    }

    private HeaderConverter createHeaderConverter(Class<? extends SerializationFormat<?>> format) {
        Configuration converterConfig = this.config.subset(HEADER_CONVERTER_PREFIX, true);
        Configuration commonConverterConfig = this.config.subset(CONVERTER_PREFIX, true);
        converterConfig = ((Configuration.Builder)commonConverterConfig.edit().with(converterConfig)).with("converter.type", "header").build();
        if (!ConverterBuilder.isFormat(format, Json.class) && !ConverterBuilder.isFormat(format, JsonByteArray.class)) {
            throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported");
        }
        converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        HeaderConverter converter = converterConfig.getInstance(FIELD_CLASS, HeaderConverter.class);
        converter.configure(converterConfig.asMap());
        return converter;
    }

    private Converter createConverter(Class<? extends SerializationFormat<?>> format, boolean key) {
        Configuration converterConfig = this.config.subset(key ? KEY_CONVERTER_PREFIX : VALUE_CONVERTER_PREFIX, true);
        Configuration commonConverterConfig = this.config.subset(CONVERTER_PREFIX, true);
        converterConfig = ((Configuration.Builder)commonConverterConfig.edit().with(converterConfig)).build();
        if (ConverterBuilder.isFormat(format, Json.class) || ConverterBuilder.isFormat(format, JsonByteArray.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        } else if (ConverterBuilder.isFormat(format, CloudEvents.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
        } else if (ConverterBuilder.isFormat(format, Avro.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.AvroConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build();
            converterConfig = ((Configuration.Builder)converterConfig.edit().withDefault(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, (Object)CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO)).build();
        } else if (ConverterBuilder.isFormat(format, Protobuf.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
        } else if (ConverterBuilder.isFormat(format, Binary.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.BinaryDataConverter").build();
        } else if (ConverterBuilder.isFormat(format, SimpleString.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.storage.StringConverter").build();
        } else {
            throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
        }
        Converter converter = converterConfig.getInstance(FIELD_CLASS, Converter.class);
        converter.configure(converterConfig.asMap(), key);
        return converter;
    }
}

