package io.debezium.processors;

import ch.qos.logback.classic.Level;
import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/processors/AbstractReselectProcessorTest.class */
public abstract class AbstractReselectProcessorTest<T extends SourceConnector> extends AbstractAsyncEngineConnectorTest {
    protected abstract Class<T> getConnectorClass();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract String reselectColumnsList();

    protected abstract void createTable() throws Exception;

    protected abstract void dropTable() throws Exception;

    protected abstract String getInsertWithValue();

    protected abstract String getInsertWithNullValue();

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        createTable();
        databaseConnection().setAutoCommit(false);
    }

    @After
    public void afterEach() throws Exception {
        stopConnector();
        assertNoRecordsToConsume();
        dropTable();
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testNoColumnsReselectedWhenNullAndUnavailableColumnsAreDisabled() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        logInterceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        databaseConnection().execute(new String[]{getInsertWithNullValue()});
        start(getConnectorClass(), getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.null.values", "false").with("reselector.reselect.unavailable.values", "false").with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        Assertions.assertThat(logInterceptor.containsMessage("disables both null and unavailable columns, no-reselection will occur")).isTrue();
        SourceRecord sourceRecord = consumeRecordsByTopicReselectWhenNotNullSnapshot().recordsForTopic(topicName()).get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidRead(sourceRecord, fieldName("id"), 1);
        Assertions.assertThat(struct.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct.get(fieldName("data"))).isNull();
        Assertions.assertThat(struct.get(fieldName("data2"))).isEqualTo(1);
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        logInterceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        databaseConnection().execute(new String[]{getInsertWithValue()});
        start(getConnectorClass(), getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        SourceRecord sourceRecord = consumeRecordsByTopicReselectWhenNotNullSnapshot().recordsForTopic(topicName()).get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidRead(sourceRecord, fieldName("id"), 1);
        Assertions.assertThat(struct.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct.get(fieldName("data"))).isEqualTo("one");
        Assertions.assertThat(struct.get(fieldName("data2"))).isEqualTo(1);
        Assertions.assertThat(logInterceptor.containsMessage("No columns require re-selection.")).isTrue();
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testNoColumnsReselectedWhenNotNullStreaming() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        logInterceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        start(getConnectorClass(), getConfigurationBuilder().with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        databaseConnection().execute(new String[]{getInsertWithValue()});
        databaseConnection().execute(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", tableName())});
        databaseConnection().execute(new String[]{String.format("DELETE FROM %s WHERE id = 1", tableName())});
        List<SourceRecord> recordsForTopic = consumeRecordsByTopicReselectWhenNotNullStreaming().recordsForTopic(topicName());
        SourceRecord sourceRecord = recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, fieldName("id"), 1);
        Assertions.assertThat(struct.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct.get(fieldName("data"))).isEqualTo("one");
        Assertions.assertThat(struct.get(fieldName("data2"))).isEqualTo(1);
        SourceRecord sourceRecord2 = recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, fieldName("id"), 1);
        Assertions.assertThat(struct2.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct2.get(fieldName("data"))).isEqualTo("two");
        Assertions.assertThat(struct2.get(fieldName("data2"))).isEqualTo(1);
        SourceRecord sourceRecord3 = recordsForTopic.get(2);
        Struct struct3 = ((Struct) sourceRecord3.value()).getStruct("after");
        VerifyRecord.isValidDelete(sourceRecord3, fieldName("id"), 1);
        Assertions.assertThat(struct3).isNull();
        SourceRecord sourceRecord4 = recordsForTopic.get(3);
        VerifyRecord.isValidTombstone(sourceRecord4, fieldName("id"), 1);
        Assertions.assertThat(sourceRecord4.value()).isNull();
        Assertions.assertThat(logInterceptor.containsMessage("No columns require re-selection.")).isTrue();
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testColumnsReselectedWhenValueIsNullSnapshot() throws Exception {
        databaseConnection().execute(new String[]{getInsertWithNullValue()});
        databaseConnection().execute(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", tableName())});
        start(getConnectorClass(), getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        SourceRecord sourceRecord = consumeRecordsByTopicReselectWhenNullSnapshot().recordsForTopic(topicName()).get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidRead(sourceRecord, fieldName("id"), 1);
        Assertions.assertThat(struct.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct.get(fieldName("data"))).isEqualTo("two");
        Assertions.assertThat(struct.get(fieldName("data2"))).isEqualTo(1);
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testColumnsReselectedWhenValueIsNullStreaming() throws Exception {
        start(getConnectorClass(), getConfigurationBuilder().with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        databaseConnection().executeWithoutCommitting(new String[]{getInsertWithNullValue()});
        databaseConnection().executeWithoutCommitting(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", tableName())});
        databaseConnection().commit();
        List<SourceRecord> recordsForTopic = consumeRecordsByTopicReselectWhenNullStreaming().recordsForTopic(topicName());
        SourceRecord sourceRecord = recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, fieldName("id"), 1);
        Assertions.assertThat(struct.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct.get(fieldName("data"))).isEqualTo("two");
        Assertions.assertThat(struct.get(fieldName("data2"))).isEqualTo(1);
        SourceRecord sourceRecord2 = recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, fieldName("id"), 1);
        Assertions.assertThat(struct2.get(fieldName("id"))).isEqualTo(1);
        Assertions.assertThat(struct2.get(fieldName("data"))).isEqualTo("two");
        Assertions.assertThat(struct2.get(fieldName("data2"))).isEqualTo(1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNotNullSnapshot() throws InterruptedException {
        return consumeRecordsByTopic(1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException {
        return consumeRecordsByTopic(4);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNullSnapshot() throws InterruptedException {
        return consumeRecordsByTopic(1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException {
        return consumeRecordsByTopic(2);
    }

    protected String fieldName(String str) {
        return str;
    }
}
