package io.debezium.embedded;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.io.IOException;
import java.time.Clock;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;

/* loaded from: input_file:io/debezium/embedded/ConvertingEngineBuilder.class */
public class ConvertingEngineBuilder<R> implements DebeziumEngine.Builder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String KEY_CONVERTER_PREFIX = "key.converter";
    private static final String VALUE_CONVERTER_PREFIX = "value.converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
    private final DebeziumEngine.Builder<SourceRecord> delegate;
    private final Class<? extends SerializationFormat<?>> formatKey;
    private final Class<? extends SerializationFormat<?>> formatValue;
    private Configuration config;
    private Function<SourceRecord, R> toFormat;
    private Function<R, SourceRecord> fromFormat;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConvertingEngineBuilder(ChangeEventFormat<?> changeEventFormat) {
        this.delegate = EmbeddedEngine.create();
        this.formatKey = null;
        this.formatValue = changeEventFormat.getValueFormat();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConvertingEngineBuilder(KeyValueChangeEventFormat<?, ?> keyValueChangeEventFormat) {
        this.delegate = EmbeddedEngine.create();
        this.formatKey = keyValueChangeEventFormat.getKeyFormat();
        this.formatValue = keyValueChangeEventFormat.getValueFormat();
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: notifying */
    public DebeziumEngine.Builder<R> notifying2(Consumer<R> consumer) {
        this.delegate.notifying2(sourceRecord -> {
            consumer.accept(this.toFormat.apply(sourceRecord));
        });
        return this;
    }

    private boolean isFormat(Class<? extends SerializationFormat<?>> cls, Class<? extends SerializationFormat<?>> cls2) {
        return cls == cls2;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: notifying */
    public DebeziumEngine.Builder<R> notifying2(DebeziumEngine.ChangeConsumer<R> changeConsumer) {
        this.delegate.notifying2((list, recordCommitter) -> {
            changeConsumer.handleBatch((List) list.stream().map(sourceRecord -> {
                return this.toFormat.apply(sourceRecord);
            }).collect(Collectors.toList()), new DebeziumEngine.RecordCommitter<R>() { // from class: io.debezium.embedded.ConvertingEngineBuilder.1
                @Override // io.debezium.engine.DebeziumEngine.RecordCommitter
                public void markProcessed(R r) throws InterruptedException {
                    recordCommitter.markProcessed((SourceRecord) ConvertingEngineBuilder.this.fromFormat.apply(r));
                }

                @Override // io.debezium.engine.DebeziumEngine.RecordCommitter
                public void markBatchFinished() throws InterruptedException {
                    recordCommitter.markBatchFinished();
                }

                @Override // io.debezium.engine.DebeziumEngine.RecordCommitter
                public void markProcessed(R r, DebeziumEngine.Offsets offsets) throws InterruptedException {
                    recordCommitter.markProcessed((SourceRecord) ConvertingEngineBuilder.this.fromFormat.apply(r), offsets);
                }

                @Override // io.debezium.engine.DebeziumEngine.RecordCommitter
                public DebeziumEngine.Offsets buildOffsets() {
                    return recordCommitter.buildOffsets();
                }
            });
        });
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(Properties properties) {
        this.config = Configuration.from(properties);
        this.delegate.using2(properties);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(ClassLoader classLoader) {
        this.delegate.using2(classLoader);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(Clock clock) {
        this.delegate.using2(clock);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(DebeziumEngine.CompletionCallback completionCallback) {
        this.delegate.using2(completionCallback);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(DebeziumEngine.ConnectorCallback connectorCallback) {
        this.delegate.using2(connectorCallback);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: using */
    public DebeziumEngine.Builder<R> using2(OffsetCommitPolicy offsetCommitPolicy) {
        this.delegate.using2(offsetCommitPolicy);
        return this;
    }

    @Override // io.debezium.engine.DebeziumEngine.Builder
    /* renamed from: build */
    public DebeziumEngine<R> build2() {
        final DebeziumEngine<SourceRecord> build2 = this.delegate.build2();
        if (this.formatValue == Connect.class) {
            this.toFormat = sourceRecord -> {
                return new EmbeddedEngineChangeEvent(null, sourceRecord, sourceRecord);
            };
        } else {
            Converter createConverter = createConverter(this.formatKey, true);
            Converter createConverter2 = createConverter(this.formatValue, false);
            this.toFormat = sourceRecord2 -> {
                String str = sourceRecord2.topic();
                if (str == null) {
                    str = TOPIC_NAME;
                }
                byte[] fromConnectData = createConverter.fromConnectData(str, sourceRecord2.keySchema(), sourceRecord2.key());
                byte[] fromConnectData2 = createConverter2.fromConnectData(str, sourceRecord2.valueSchema(), sourceRecord2.value());
                if ((isFormat(this.formatKey, Json.class) && isFormat(this.formatValue, Json.class)) || isFormat(this.formatValue, CloudEvents.class)) {
                    return new EmbeddedEngineChangeEvent(fromConnectData != null ? new String(fromConnectData) : null, fromConnectData2 != null ? new String(fromConnectData2) : null, sourceRecord2);
                }
                return new EmbeddedEngineChangeEvent(fromConnectData, fromConnectData2, sourceRecord2);
            };
        }
        this.fromFormat = obj -> {
            return ((EmbeddedEngineChangeEvent) obj).sourceRecord();
        };
        return new DebeziumEngine<R>() { // from class: io.debezium.embedded.ConvertingEngineBuilder.2
            @Override // java.lang.Runnable
            public void run() {
                build2.run();
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                build2.close();
            }
        };
    }

    private Converter createConverter(Class<? extends SerializationFormat<?>> cls, boolean z) {
        Configuration build;
        Configuration build2 = this.config.subset(CONVERTER_PREFIX, true).edit().with(this.config.subset(z ? "key.converter" : "value.converter", true)).build();
        if (isFormat(cls, Json.class)) {
            build = build2.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? build2.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build() : build2.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        } else if (isFormat(cls, CloudEvents.class)) {
            build = build2.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
        } else if (isFormat(cls, Avro.class)) {
            build = build2.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? build2.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.AvroConverter").build() : build2.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build();
        } else {
            if (!isFormat(cls, Protobuf.class)) {
                throw new DebeziumException("Converter '" + cls.getSimpleName() + "' is not supported");
            }
            build = build2.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
        }
        Converter converter = (Converter) build.getInstance(FIELD_CLASS, Converter.class);
        converter.configure(build.asMap(), z);
        return converter;
    }
}
