package io.debezium.embedded;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.engine.Header;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Binary;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.format.SimpleString;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.InsertHeader;

/* loaded from: input_file:io/debezium/embedded/ConverterBuilder.class */
public class ConverterBuilder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String HEADER_CONVERTER_PREFIX = "header.converter";
    private static final String KEY_CONVERTER_PREFIX = "key.converter";
    private static final String VALUE_CONVERTER_PREFIX = "value.converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
    private Class<? extends SerializationFormat<?>> formatHeader;
    private Class<? extends SerializationFormat<?>> formatKey;
    private Class<? extends SerializationFormat<?>> formatValue;
    private Configuration config;

    public ConverterBuilder<R> using(KeyValueHeaderChangeEventFormat<?, ?, ?> keyValueHeaderChangeEventFormat) {
        this.formatKey = keyValueHeaderChangeEventFormat.getKeyFormat();
        this.formatValue = keyValueHeaderChangeEventFormat.getValueFormat();
        this.formatHeader = keyValueHeaderChangeEventFormat.getHeaderFormat();
        return this;
    }

    public ConverterBuilder<R> using(Properties properties) {
        this.config = Configuration.from(properties);
        return this;
    }

    public HeaderConverter headerConverter() {
        if (this.formatValue == Connect.class || this.formatHeader == null) {
            return null;
        }
        return createHeaderConverter(this.formatHeader);
    }

    public Function<SourceRecord, R> toFormat(HeaderConverter headerConverter) {
        Function<SourceRecord, R> function;
        if (this.formatValue == Connect.class) {
            function = sourceRecord -> {
                return new EmbeddedEngineChangeEvent(null, sourceRecord, (List) StreamSupport.stream(sourceRecord.headers().spliterator(), false).map(EmbeddedEngineHeader::new).collect(Collectors.toList()), sourceRecord);
            };
        } else {
            Converter createConverter = createConverter(this.formatKey, true);
            Converter createConverter2 = createConverter(this.formatValue, false);
            function = sourceRecord2 -> {
                String str = sourceRecord2.topic();
                if (str == null) {
                    str = "debezium";
                }
                byte[] fromConnectData = createConverter.fromConnectData(str, sourceRecord2.keySchema(), sourceRecord2.key());
                byte[] fromConnectData2 = createConverter2.fromConnectData(str, sourceRecord2.valueSchema(), sourceRecord2.value());
                List<Header<byte[]>> emptyList = Collections.emptyList();
                if (headerConverter != null) {
                    List<Header<byte[]>> convertHeaders = convertHeaders(sourceRecord2, str, headerConverter);
                    emptyList = convertHeaders;
                    if (shouldConvertHeadersToString()) {
                        emptyList = (List) convertHeaders.stream().map(header -> {
                            return new EmbeddedEngineHeader(header.getKey(), new String((byte[]) header.getValue(), StandardCharsets.UTF_8));
                        }).collect(Collectors.toList());
                    }
                }
                Object obj = fromConnectData;
                Object obj2 = fromConnectData2;
                if (fromConnectData != null && shouldConvertKeyToString()) {
                    obj = new String(fromConnectData, StandardCharsets.UTF_8);
                }
                if (fromConnectData2 != null && shouldConvertValueToString()) {
                    obj2 = new String(fromConnectData2, StandardCharsets.UTF_8);
                }
                return new EmbeddedEngineChangeEvent(obj, obj2, emptyList, sourceRecord2);
            };
        }
        return function;
    }

    public Function<R, SourceRecord> fromFormat() {
        return obj -> {
            return ((EmbeddedEngineChangeEvent) obj).sourceRecord();
        };
    }

    private static boolean isFormat(Class<? extends SerializationFormat<?>> cls, Class<? extends SerializationFormat<?>> cls2) {
        return cls == cls2;
    }

    private boolean shouldConvertKeyToString() {
        return isFormat(this.formatKey, Json.class) || isFormat(this.formatKey, SimpleString.class);
    }

    private boolean shouldConvertValueToString() {
        return isFormat(this.formatValue, Json.class) || isFormat(this.formatValue, SimpleString.class) || isFormat(this.formatValue, CloudEvents.class);
    }

    private boolean shouldConvertHeadersToString() {
        return isFormat(this.formatHeader, Json.class);
    }

    private List<Header<byte[]>> convertHeaders(SourceRecord sourceRecord, String str, HeaderConverter headerConverter) {
        ArrayList arrayList = new ArrayList();
        for (org.apache.kafka.connect.header.Header header : sourceRecord.headers()) {
            String key = header.key();
            arrayList.add(new EmbeddedEngineHeader(key, headerConverter.fromConnectHeader(str, key, header.schema(), header.value())));
        }
        return arrayList;
    }

    private HeaderConverter createHeaderConverter(Class<? extends SerializationFormat<?>> cls) {
        Configuration build = this.config.subset(CONVERTER_PREFIX, true).edit().with(this.config.subset("header.converter", true)).with(ConverterConfig.TYPE_CONFIG, InsertHeader.HEADER_FIELD).build();
        if (!isFormat(cls, Json.class) && !isFormat(cls, JsonByteArray.class)) {
            throw new DebeziumException("Header Converter '" + cls.getSimpleName() + "' is not supported");
        }
        Configuration build2 = build.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        HeaderConverter headerConverter = (HeaderConverter) build2.getInstance(FIELD_CLASS, HeaderConverter.class);
        headerConverter.configure(build2.asMap());
        return headerConverter;
    }

    private Converter createConverter(Class<? extends SerializationFormat<?>> cls, boolean z) {
        Configuration build;
        Configuration build2 = this.config.subset(CONVERTER_PREFIX, true).edit().with(this.config.subset(z ? "key.converter" : "value.converter", true)).build();
        if (isFormat(cls, Json.class) || isFormat(cls, JsonByteArray.class)) {
            build = build2.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? build2.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build() : build2.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        } else if (isFormat(cls, CloudEvents.class)) {
            build = build2.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
        } else if (isFormat(cls, Avro.class)) {
            build = (build2.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? build2.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.AvroConverter").build() : build2.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build()).edit().withDefault(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO).build();
        } else if (isFormat(cls, Protobuf.class)) {
            build = build2.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
        } else if (isFormat(cls, Binary.class)) {
            build = build2.edit().withDefault(FIELD_CLASS, "io.debezium.converters.BinaryDataConverter").build();
        } else {
            if (!isFormat(cls, SimpleString.class)) {
                throw new DebeziumException("Converter '" + cls.getSimpleName() + "' is not supported");
            }
            build = build2.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.storage.StringConverter").build();
        }
        Converter converter = (Converter) build.getInstance(FIELD_CLASS, Converter.class);
        converter.configure(build.asMap(), z);
        return converter;
    }
}
