package io.debezium.connector.jdbc.e2e;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.JdbcSinkConnectorTask;
import io.debezium.connector.jdbc.JdbcSinkTaskTestContext;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.ConverterType;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/e2e/AbstractJdbcSinkIT.class */
public abstract class AbstractJdbcSinkIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSinkIT.class);
    private JsonConverter keyConverter;
    private JsonConverter valueConverter;
    private JdbcSinkConnectorTask sinkTask;
    private KafkaConsumer<byte[], byte[]> consumer;
    private final ConcurrentLinkedQueue<SinkRecord> consumerRecords = new ConcurrentLinkedQueue<>();
    private CountDownLatch stopLatch = new CountDownLatch(1);
    private ExecutorService sinkExecutor;
    private JdbcSinkConnectorConfig currentSinkConfig;
    private TimeZone currentSinkTimeZone;

    @AfterEach
    public void afterEach() throws Exception {
        stopSink();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JdbcSinkConnectorConfig getCurrentSinkConfig() {
        return this.currentSinkConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TimeZone getCurrentSinkTimeZone() {
        if (this.currentSinkTimeZone == null) {
            this.currentSinkTimeZone = TimeZone.getTimeZone(this.currentSinkConfig.getDatabaseTimeZone());
        }
        return this.currentSinkTimeZone;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startSink(final Source source, Properties properties, final String str) {
        this.keyConverter = new JsonConverter();
        this.keyConverter.configure(Map.of("converter.type", ConverterType.KEY.getName(), "schemas.enable", "true"));
        this.valueConverter = new JsonConverter();
        this.valueConverter.configure(Map.of("converter.type", ConverterType.VALUE.getName(), "schemas.enable", "true"));
        this.sinkTask = new JdbcSinkConnectorTask();
        HashMap hashMap = new HashMap();
        properties.forEach((obj, obj2) -> {
            hashMap.put((String) obj, (String) obj2);
        });
        this.currentSinkConfig = new JdbcSinkConnectorConfig(hashMap);
        this.sinkTask.initialize(new JdbcSinkTaskTestContext(hashMap));
        this.sinkTask.start(hashMap);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", source.getKafka().getBootstrapServers());
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties2.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties2.put("allow.auto.create.topics", "false");
        properties2.put("group.id", "jdbc-sink-consumer");
        this.consumer = new KafkaConsumer<>(properties2);
        this.stopLatch = new CountDownLatch(1);
        this.sinkExecutor = Executors.newFixedThreadPool(1);
        this.sinkExecutor.submit(new Runnable() { // from class: io.debezium.connector.jdbc.e2e.AbstractJdbcSinkIT.1
            @Override // java.lang.Runnable
            public void run() {
                Pattern compile = Pattern.compile("^" + source.getType().getValue() + ".*" + str + "$", 2);
                Awaitility.await("Topic with pattern not created").atMost(60L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(AbstractJdbcSinkIT.this.consumer.listTopics().keySet().stream().anyMatch(str2 -> {
                        return compile.matcher(str2).matches();
                    }));
                });
                AbstractJdbcSinkIT.this.consumer.subscribe(compile);
                AbstractJdbcSinkIT.LOGGER.info("KafkaConsumer thread is now polling for records.");
                while (AbstractJdbcSinkIT.this.stopLatch.getCount() == 1) {
                    ConsumerRecords poll = AbstractJdbcSinkIT.this.consumer.poll(Duration.ofSeconds(1L));
                    AbstractJdbcSinkIT.LOGGER.info("Consumer poll returned {} records", Integer.valueOf(poll.count()));
                    if (!poll.isEmpty()) {
                        poll.forEach(consumerRecord -> {
                            AbstractJdbcSinkIT.this.consumerRecords.add(AbstractJdbcSinkIT.this.getSinkRecordFromConsumerRecord(consumerRecord));
                        });
                    }
                }
                AbstractJdbcSinkIT.LOGGER.info("Unsubscribing from KafkaConsumer and closing consumer.");
                AbstractJdbcSinkIT.this.consumer.unsubscribe();
                AbstractJdbcSinkIT.this.consumer.close();
            }
        });
    }

    protected void stopSink() throws Exception {
        if (this.sinkExecutor != null) {
            this.stopLatch.countDown();
            this.sinkExecutor.shutdown();
            this.sinkExecutor.awaitTermination(60L, TimeUnit.SECONDS);
            this.sinkExecutor = null;
            this.stopLatch = null;
        }
        if (this.sinkTask != null) {
            this.sinkTask.stop();
            this.sinkTask = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkRecord consumeSinkRecord() {
        return consumeSinkRecords(1).get(0);
    }

    protected List<SinkRecord> consumeSinkRecords(int i) {
        ArrayList arrayList = new ArrayList();
        Awaitility.await("Expected to receive " + i + " from source connector").atMost(Duration.ofMinutes(1L)).until(() -> {
            if (this.consumerRecords.size() < i) {
                return false;
            }
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(this.consumerRecords.poll());
            }
            return true;
        });
        this.sinkTask.put(arrayList);
        return arrayList;
    }

    protected SinkRecord getSinkRecordFromConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        SchemaAndValue connectData = this.valueConverter.toConnectData(consumerRecord.topic(), (byte[]) consumerRecord.value());
        SchemaAndValue connectData2 = this.keyConverter.toConnectData(consumerRecord.topic(), (byte[]) consumerRecord.key());
        return new SinkRecord(consumerRecord.topic(), consumerRecord.partition(), connectData2.schema(), connectData2.value(), connectData.schema(), connectData.value(), consumerRecord.offset());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConnectorConfiguration getSourceConnectorConfig(Source source, String str) {
        ConnectorConfiguration create = ConnectorConfiguration.create();
        create.with("tasks.max", 1);
        create.with("key.converter", JsonConverter.class.getName());
        create.with("value.converter", JsonConverter.class.getName());
        create.with("topic.prefix", source.getType().getValue());
        create.with("database.hostname", source.getType().getValue());
        create.with("key.converter.schemas.enabled", "true");
        create.with("value.converter.schemas.enabled", "true");
        create.with("decimal.handling.mode", "double");
        create.with("time.precision.mode", source.getOptions().getTemporalPrecisionMode().getValue());
        if (source.getOptions().isFlatten()) {
            create.with("transforms", "flat");
            create.with("transforms.flat.type", "io.debezium.transforms.ExtractNewRecordState");
            create.with("transforms.flat.drop.tombstones", "false");
        }
        switch (source.getType()) {
            case MYSQL:
                create.with("connector.class", "io.debezium.connector.mysql.MySqlConnector");
                create.with("database.password", source.getPassword());
                create.with("database.user", "root");
                create.with("database.server.id", 12345);
                create.with("database.include.list", "test");
                create.with("table.include.list", "test." + str);
                create.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                create.with("schema.history.internal.kafka.topic", "schema-history-mysql");
                create.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                if (TestHelper.isConnectionTimeZoneUsed()) {
                    create.with("driver.connectionTimeZone", TestHelper.getSourceTimeZone());
                    create.with("driver.serverTimeZone", TestHelper.getSourceTimeZone());
                }
                if (source.getOptions().isColumnTypePropagated()) {
                    create.with("column.propagate.source.type", "test.*");
                    break;
                }
                break;
            case POSTGRES:
                create.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
                create.with("database.password", source.getPassword());
                create.with("database.user", source.getUsername());
                create.with("database.dbname", "test");
                create.with("slot.drop.on.stop", "true");
                create.with("schema.include.list", "public");
                create.with("table.include.list", "public." + str);
                if (source.getOptions().isColumnTypePropagated()) {
                    create.with("column.propagate.source.type", "public.*");
                    break;
                }
                break;
            case SQLSERVER:
                create.with("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
                create.with("database.password", source.getPassword());
                create.with("database.user", source.getUsername());
                create.with("database.names", "testDB");
                create.with("database.encrypt", "false");
                create.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                create.with("schema.history.internal.kafka.topic", "schema-history-sqlserver");
                create.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                create.with("table.include.list", "dbo." + str);
                if (source.getOptions().isColumnTypePropagated()) {
                    create.with("column.propagate.source.type", ".*");
                    break;
                }
                break;
            case ORACLE:
                create.with("connector.class", "io.debezium.connector.oracle.OracleConnector");
                create.with("database.dbname", "ORCLCDB");
                create.with("database.pdb.name", "ORCLPDB1");
                create.with("database.port", "1521");
                create.with("database.password", "dbz");
                create.with("database.user", "c##dbzuser");
                create.with("table.include.list", "debezium." + str);
                create.with("log.mining.strategy", "online_catalog");
                create.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                create.with("schema.history.internal.kafka.topic", "schema-history-oracle");
                create.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                if (source.getOptions().isColumnTypePropagated()) {
                    create.with("column.propagate.source.type", "debezium.*");
                    break;
                }
                break;
            default:
                throw new IllegalStateException("Unsupported source type: " + source.getType());
        }
        return create;
    }
}
