package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.class */
public abstract class AbstractSnapshotTest<T extends SourceConnector> extends AbstractAsyncEngineConnectorTest {
    protected static final int ROW_COUNT = 1000;
    protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-is.txt").toAbsolutePath();
    protected static final int PARTITION_NO = 0;
    protected static final String SERVER_NAME = "test_server";
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 5;
    protected final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Class<T> connectorClass();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract JdbcConnection databaseConnection();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String topicName();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String tableName();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract List<String> topicNames();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract List<String> tableNames();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String signalTableName();

    /* JADX INFO: Access modifiers changed from: protected */
    public String signalTableNameSanitized() {
        return signalTableName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Configuration.Builder config();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Configuration.Builder mutableConfig(boolean z, boolean z2);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String connector();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String server();

    /* JADX INFO: Access modifiers changed from: protected */
    public String task() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String database() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForCdcTransactionPropagation(int i) throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String alterTableAddColumnStatement(String str) {
        return "ALTER TABLE " + str + " add col3 int default 0";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String alterTableDropColumnStatement(String str) {
        return "ALTER TABLE " + str + " drop column col3";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String tableDataCollectionId() {
        return tableName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> tableDataCollectionIds() {
        return tableNames();
    }

    protected void populateTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        for (int i = PARTITION_NO; i < ROW_COUNT; i++) {
            jdbcConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", str, jdbcConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
        jdbcConnection.commit();
    }

    protected void populateTable(JdbcConnection jdbcConnection) throws SQLException {
        populateTable(jdbcConnection, tableName());
    }

    protected void populateTables(JdbcConnection jdbcConnection) throws SQLException {
        Iterator<String> it = tableNames().iterator();
        while (it.hasNext()) {
            populateTable(jdbcConnection, it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void populateTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTable(databaseConnection);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void populateTable(String str) throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTable(databaseConnection, str);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void populateTableWithSpecificValue(int i, int i2, int i3) throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTableWithSpecificValue(databaseConnection, tableName(), i, i2, i3);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void populateTableWithSpecificValue(JdbcConnection jdbcConnection, String str, int i, int i2, int i3) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        for (int i4 = i + 1; i4 <= i + i2; i4++) {
            jdbcConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", str, jdbcConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i2 + i4), Integer.valueOf(i3))});
        }
        jdbcConnection.commit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void populateTables() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTables(databaseConnection);
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void populate4PkTable(JdbcConnection jdbcConnection, String str) throws SQLException {
        jdbcConnection.setAutoCommit(false);
        for (int i = PARTITION_NO; i < ROW_COUNT; i++) {
            int i2 = i + 1;
            jdbcConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", str, Integer.valueOf(i2 / ROW_COUNT), Integer.valueOf((i2 / 100) % 10), Integer.valueOf((i2 / 10) % 10), Integer.valueOf(i2 % 10), Integer.valueOf(i))});
        }
        jdbcConnection.commit();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int i) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, topicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int i, String str) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, entry -> {
            return true;
        }, null, str);
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int i, Function<SourceRecord, V> function, Predicate<Map.Entry<Integer, V>> predicate, Consumer<List<SourceRecord>> consumer, String str) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, predicate, struct -> {
            return struct.getInt32(pkFieldName());
        }, function, str, consumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int i, Predicate<Map.Entry<Integer, V>> predicate, Function<Struct, Integer> function, Function<SourceRecord, V> function2, String str, Consumer<List<SourceRecord>> consumer) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, predicate, function, function2, str, consumer, true);
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int i, Predicate<Map.Entry<Integer, V>> predicate, Function<Struct, Integer> function, Function<SourceRecord, V> function2, String str, Consumer<List<SourceRecord>> consumer, boolean z) throws InterruptedException {
        HashMap hashMap = new HashMap();
        int i2 = PARTITION_NO;
        while (true) {
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1, z);
            List<SourceRecord> recordsForTopic = consumeRecordsByTopic.recordsForTopic(str);
            if (consumeRecordsByTopic.allRecordsInOrder().isEmpty()) {
                i2++;
                Assertions.assertThat(i2).describedAs(String.format("Too many no data record results, %d < %d", Integer.valueOf(hashMap.size()), Integer.valueOf(i)), new Object[PARTITION_NO]).isLessThanOrEqualTo(MAXIMUM_NO_RECORDS_CONSUMES);
            } else {
                i2 = PARTITION_NO;
                if (recordsForTopic != null && !recordsForTopic.isEmpty()) {
                    recordsForTopic.forEach(sourceRecord -> {
                        int intValue = ((Integer) function.apply((Struct) sourceRecord.key())).intValue();
                        hashMap.put(Integer.valueOf(intValue), function2.apply(sourceRecord));
                    });
                    if (consumer != null) {
                        consumer.accept(recordsForTopic);
                    }
                    if (hashMap.size() >= i && !hashMap.entrySet().stream().anyMatch(predicate.negate())) {
                        Assertions.assertThat(hashMap).hasSize(i);
                        return hashMap;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int i) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, Function.identity(), entry -> {
            return true;
        }, null, topicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int i, Predicate<Map.Entry<Integer, Integer>> predicate, Consumer<List<SourceRecord>> consumer) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, predicate, consumer, topicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int i, Predicate<Map.Entry<Integer, SourceRecord>> predicate, Consumer<List<SourceRecord>> consumer) throws InterruptedException {
        return consumeMixedWithIncrementalSnapshot(i, Function.identity(), predicate, consumer, topicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String valueFieldName() {
        return "aa";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String pkFieldName() {
        return "pk";
    }

    protected void startConnector(DebeziumEngine.CompletionCallback completionCallback) {
        startConnector(Function.identity(), completionCallback, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConnector(Function<Configuration.Builder, Configuration.Builder> function) {
        startConnector(function, loggingCompletion(), true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConnector(Function<Configuration.Builder, Configuration.Builder> function, DebeziumEngine.CompletionCallback completionCallback, boolean z) {
        start(connectorClass(), function.apply(config()).build(), completionCallback);
        waitForConnectorToStart();
        waitForAvailableRecords(5L, TimeUnit.SECONDS);
        if (z) {
            assertNoRecordsToConsume();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConnectorWithSnapshot(Function<Configuration.Builder, Configuration.Builder> function) {
        startConnector(function, loggingCompletion(), false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConnector() {
        startConnector(Function.identity(), loggingCompletion(), true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForConnectorToStart() {
        assertConnectorIsRunning();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Function<Struct, Integer> getRecordValue() {
        return struct -> {
            return struct.getStruct("after").getInt32(valueFieldName());
        };
    }

    @Override // io.debezium.embedded.AbstractConnectorTest
    protected int getMaximumEnqueuedRecordCount() {
        return 3000;
    }

    protected void sendExecuteSnapshotFileSignal(String str) throws IOException {
        sendExecuteSnapshotFileSignal(str, "INCREMENTAL", this.signalsFile);
    }

    protected void sendExecuteSnapshotFileSignal(String str, String str2, Path path) throws IOException {
        Files.write(path, String.format("{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"%s\"}}", str, str2).getBytes(), new OpenOption[PARTITION_NO]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignal(String... strArr) throws SQLException {
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String str, String str2, String... strArr) {
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(str, str2, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> map, String str, String... strArr) {
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(map, str, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String str, String str2, AbstractSnapshotSignal.SnapshotType snapshotType, String... strArr) {
        String str3 = (String) Arrays.stream(strArr).map(str4 -> {
            return "\"" + str4 + "\"";
        }).collect(Collectors.joining(", "));
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                String format = (Strings.isNullOrEmpty(str) || Strings.isNullOrEmpty(str2)) ? !Strings.isNullOrEmpty(str) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')", signalTableName(), snapshotType.toString(), str3, str) : !Strings.isNullOrEmpty(str2) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", signalTableName(), snapshotType.toString(), str3, str2) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", signalTableName(), snapshotType.toString(), str3) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", signalTableName(), snapshotType.toString(), str3, str, str2);
                this.logger.info("Sending signal with query {}", format);
                databaseConnection.execute(new String[]{format});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.warn("Failed to send signal", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> map, String str, AbstractSnapshotSignal.SnapshotType snapshotType, String... strArr) {
        String str2 = (String) Arrays.stream(strArr).map(str3 -> {
            return "\"" + str3 + "\"";
        }).collect(Collectors.joining(", "));
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                String format = (map.isEmpty() || Strings.isNullOrEmpty(str)) ? !map.isEmpty() ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s]}')", signalTableName(), snapshotType.toString(), str2, buildAdditionalConditions(map)) : !Strings.isNullOrEmpty(str) ? String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", signalTableName(), snapshotType.toString(), str2, str) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", signalTableName(), snapshotType.toString(), str2) : String.format("INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s], \"surrogate-key\": %s}')", signalTableName(), snapshotType.toString(), str2, buildAdditionalConditions(map), str);
                this.logger.info("Sending signal with query {}", format);
                databaseConnection.execute(new String[]{format});
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.warn("Failed to send signal", e);
        }
    }

    protected static String buildAdditionalConditions(Map<String, String> map) {
        return (String) map.entrySet().stream().map(entry -> {
            return String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(","));
    }
}
