package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.function.BooleanConsumer;
import io.debezium.junit.SkipTestRule;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.fest.assertions.Assertions;
import org.fest.assertions.BooleanAssert;
import org.fest.assertions.Delta;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/embedded/AbstractConnectorTest.class */
public abstract class AbstractConnectorTest implements Testing {
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
    private ExecutorService executor;
    protected EmbeddedEngine engine;
    private BlockingQueue<SourceRecord> consumedLines;
    private CountDownLatch latch;

    @Rule
    public TestRule skipTestRule = new SkipTestRule();
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5);
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private JsonConverter keyJsonConverter = new JsonConverter();
    private JsonConverter valueJsonConverter = new JsonConverter();
    private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
    private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();

    /* loaded from: input_file:io/debezium/embedded/AbstractConnectorTest$SourceRecords.class */
    protected class SourceRecords {
        private final List<SourceRecord> records = new ArrayList();
        private final Map<String, List<SourceRecord>> recordsByTopic = new HashMap();
        private final Map<String, List<SourceRecord>> ddlRecordsByDbName = new HashMap();

        protected SourceRecords() {
        }

        public void add(SourceRecord sourceRecord) {
            this.records.add(sourceRecord);
            this.recordsByTopic.computeIfAbsent(sourceRecord.topic(), str -> {
                return new ArrayList();
            }).add(sourceRecord);
            String affectedDatabase = getAffectedDatabase(sourceRecord);
            if (affectedDatabase != null) {
                this.ddlRecordsByDbName.computeIfAbsent(affectedDatabase, str2 -> {
                    return new ArrayList();
                }).add(sourceRecord);
            }
        }

        protected String getAffectedDatabase(SourceRecord sourceRecord) {
            Field field;
            Struct struct = (Struct) sourceRecord.value();
            if (struct == null || (field = struct.schema().field("databaseName")) == null) {
                return null;
            }
            return struct.getString(field.name());
        }

        public List<SourceRecord> ddlRecordsForDatabase(String str) {
            return this.ddlRecordsByDbName.get(str);
        }

        public Set<String> databaseNames() {
            return this.ddlRecordsByDbName.keySet();
        }

        public List<SourceRecord> recordsForTopic(String str) {
            return this.recordsByTopic.get(str);
        }

        public Set<String> topics() {
            return this.recordsByTopic.keySet();
        }

        public void forEachInTopic(String str, Consumer<SourceRecord> consumer) {
            recordsForTopic(str).forEach(consumer);
        }

        public void forEach(Consumer<SourceRecord> consumer) {
            this.records.forEach(consumer);
        }

        public List<SourceRecord> allRecordsInOrder() {
            return Collections.unmodifiableList(this.records);
        }

        public void print() {
            Testing.print("" + topics().size() + " topics: " + topics());
            this.recordsByTopic.forEach((str, list) -> {
                Testing.print(" - topic:'" + str + "'; # of events = " + list.size());
            });
            Testing.print("Records:");
            this.records.forEach(sourceRecord -> {
                AbstractConnectorTest.this.print(sourceRecord);
            });
        }
    }

    @Before
    public final void initializeConnectorTestFramework() {
        LoggingContext.forConnector(getClass().getSimpleName(), "", "test");
        this.keyJsonConverter = new JsonConverter();
        this.valueJsonConverter = new JsonConverter();
        this.keyJsonDeserializer = new JsonDeserializer();
        this.valueJsonDeserializer = new JsonDeserializer();
        Configuration build = Configuration.create().build();
        Configuration build2 = Configuration.create().build();
        this.keyJsonConverter.configure(build.asMap(), true);
        this.valueJsonConverter.configure(build.asMap(), false);
        this.keyJsonDeserializer.configure(build2.asMap(), true);
        this.valueJsonDeserializer.configure(build2.asMap(), false);
        resetBeforeEachTest();
        this.consumedLines = new ArrayBlockingQueue(getMaximumEnqueuedRecordCount());
        Testing.Files.delete(OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public final void stopConnector() {
        stopConnector(null);
    }

    public void stopConnector(BooleanConsumer booleanConsumer) {
        try {
            this.logger.info("Stopping the connector");
            if (this.engine != null && this.engine.isRunning()) {
                this.engine.stop();
                try {
                    this.engine.await(8L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (this.executor != null) {
                Assertions.assertThat(this.executor.shutdownNow()).isEmpty();
                do {
                    try {
                    } catch (InterruptedException e2) {
                        Thread.interrupted();
                    }
                } while (!this.executor.awaitTermination(10L, TimeUnit.SECONDS));
            }
            if (this.engine != null && this.engine.isRunning()) {
                do {
                    try {
                    } catch (InterruptedException e3) {
                        Thread.interrupted();
                    }
                } while (!this.engine.await(5L, TimeUnit.SECONDS));
            }
            if (booleanConsumer != null) {
                booleanConsumer.accept(this.engine != null ? this.engine.isRunning() : false);
            }
        } finally {
            this.engine = null;
            this.executor = null;
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 100;
    }

    protected EmbeddedEngine.CompletionCallback loggingCompletion() {
        return (z, str, th) -> {
            if (z) {
                this.logger.info(str);
            } else {
                this.logger.error(str, th);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start(Class<? extends SourceConnector> cls, Configuration configuration) {
        start(cls, configuration, loggingCompletion(), null);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, Predicate<SourceRecord> predicate) {
        start(cls, configuration, loggingCompletion(), predicate);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, EmbeddedEngine.CompletionCallback completionCallback) {
        start(cls, configuration, completionCallback, null);
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, EmbeddedEngine.CompletionCallback completionCallback, Predicate<SourceRecord> predicate) {
        Configuration build = Configuration.copy(configuration).with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with(EmbeddedEngine.CONNECTOR_CLASS, cls.getName()).with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        this.latch = new CountDownLatch(1);
        EmbeddedEngine.CompletionCallback completionCallback2 = (z, str, th) -> {
            if (completionCallback != null) {
                try {
                    completionCallback.handle(z, str, th);
                } finally {
                    if (!z) {
                        this.latch.countDown();
                    }
                }
            }
            Testing.debug("Stopped connector");
        };
        this.engine = EmbeddedEngine.create().using(build).notifying(sourceRecord -> {
            if (predicate != null && predicate.test(sourceRecord)) {
                this.logger.error("Stopping connector after record as requested");
                throw new ConnectException("Stopping connector after record as requested");
            }
            try {
                this.consumedLines.put(sourceRecord);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }).using(getClass().getClassLoader()).using(completionCallback2).using(new EmbeddedEngine.ConnectorCallback() { // from class: io.debezium.embedded.AbstractConnectorTest.1
            public void taskStarted() {
                AbstractConnectorTest.this.latch.countDown();
            }
        }).build();
        Assertions.assertThat(this.executor).isNull();
        this.executor = Executors.newFixedThreadPool(1);
        this.executor.execute(() -> {
            LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
            this.engine.run();
        });
        try {
            if (!this.latch.await(10L, TimeUnit.SECONDS)) {
                this.logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
            }
        } catch (InterruptedException e) {
            if (Thread.interrupted()) {
                Assert.fail("Interrupted while waiting for engine startup");
            }
        }
    }

    protected void setConsumeTimeout(long j, TimeUnit timeUnit) {
        if (j < 0) {
            throw new IllegalArgumentException("The timeout may not be negative");
        }
        this.pollTimeoutInMs = timeUnit.toMillis(j);
    }

    protected SourceRecord consumeRecord() throws InterruptedException {
        return this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
    }

    protected int consumeRecords(int i) throws InterruptedException {
        return consumeRecords(i, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int consumeRecords(int i, Consumer<SourceRecord> consumer) throws InterruptedException {
        SourceRecord poll;
        int i2 = 0;
        while (i2 < i && (poll = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS)) != null) {
            i2++;
            if (consumer != null) {
                consumer.accept(poll);
            }
            if (Testing.Debug.isEnabled()) {
                Testing.debug("Consumed record " + i2 + " / " + i + " (" + (i - i2) + " more)");
                debug(poll);
            } else if (Testing.Print.isEnabled()) {
                Testing.print("Consumed record " + i2 + " / " + i + " (" + (i - i2) + " more)");
                print(poll);
            }
        }
        return i2;
    }

    protected SourceRecords consumeRecordsByTopic(int i) throws InterruptedException {
        SourceRecords sourceRecords = new SourceRecords();
        sourceRecords.getClass();
        consumeRecords(i, sourceRecords::add);
        return sourceRecords;
    }

    protected int consumeAvailableRecords(Consumer<SourceRecord> consumer) {
        LinkedList linkedList = new LinkedList();
        this.consumedLines.drainTo(linkedList);
        if (consumer != null) {
            linkedList.forEach(consumer);
        }
        return linkedList.size();
    }

    protected boolean waitForAvailableRecords(long j, TimeUnit timeUnit) {
        Assertions.assertThat(j).isGreaterThanOrEqualTo(0L);
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        while (System.currentTimeMillis() < currentTimeMillis && this.consumedLines.isEmpty()) {
        }
        return !this.consumedLines.isEmpty();
    }

    protected void assertConnectorIsRunning() {
        Assertions.assertThat(this.engine.isRunning()).isTrue();
    }

    protected void assertConnectorNotRunning() {
        Assertions.assertThat(this.engine.isRunning()).isFalse();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertNoRecordsToConsume() {
        Assertions.assertThat(this.consumedLines.isEmpty()).isTrue();
    }

    protected void assertKey(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.hasValidKey(sourceRecord, str, i);
    }

    protected void assertInsert(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidInsert(sourceRecord, str, i);
    }

    protected void assertUpdate(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidUpdate(sourceRecord, str, i);
    }

    protected void assertDelete(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidDelete(sourceRecord, str, i);
    }

    protected void assertTombstone(SourceRecord sourceRecord, String str, int i) {
        VerifyRecord.isValidTombstone(sourceRecord, str, i);
    }

    protected void assertTombstone(SourceRecord sourceRecord) {
        VerifyRecord.isValidTombstone(sourceRecord);
    }

    protected void assertOffset(SourceRecord sourceRecord, Map<String, ?> map) {
        Assertions.assertThat(sourceRecord.sourceOffset()).isEqualTo(map);
    }

    protected void assertOffset(SourceRecord sourceRecord, String str, Object obj) {
        assertSameValue(sourceRecord.sourceOffset().get(str), obj);
    }

    protected void assertValueField(SourceRecord sourceRecord, String str, Object obj) {
        Object value = sourceRecord.value();
        String[] split = str.split("/");
        String str2 = null;
        for (int i = 0; i != split.length; i++) {
            String str3 = split[i];
            if (value instanceof Struct) {
                value = ((Struct) value).get(str3);
            } else {
                Assert.fail("Expected the " + (str2 == null ? "record value" : "'" + str2 + "'") + " to be a Struct but was " + value.getClass().getSimpleName() + " in record: " + SchemaUtil.asString(sourceRecord));
            }
            str2 = str2 == null ? str3 : str2 + "/" + str3;
        }
        assertSameValue(value, obj);
    }

    private void assertSameValue(Object obj, Object obj2) {
        if ((obj2 instanceof Double) || (obj2 instanceof Float) || (obj2 instanceof BigDecimal)) {
            double doubleValue = ((Number) obj2).doubleValue();
            Assertions.assertThat(((Number) obj).doubleValue()).isEqualTo(doubleValue, Delta.delta(0.01d * doubleValue));
            return;
        }
        if ((obj2 instanceof Integer) || (obj2 instanceof Long) || (obj2 instanceof Short)) {
            Assertions.assertThat(((Number) obj).longValue()).isEqualTo(((Number) obj2).longValue());
        } else if (!(obj2 instanceof Boolean)) {
            Assertions.assertThat(obj).isEqualTo(obj2);
        } else {
            Assertions.assertThat(((Boolean) obj).booleanValue()).isEqualTo(((Boolean) obj2).booleanValue());
        }
    }

    protected void assertSchemaMatchesStruct(SchemaAndValue schemaAndValue) {
        VerifyRecord.schemaMatchesStruct(schemaAndValue);
    }

    protected void assertSchemaMatchesStruct(Struct struct, Schema schema) {
        VerifyRecord.schemaMatchesStruct(struct, schema);
    }

    protected void assertEngineIsRunning() {
        ((BooleanAssert) Assertions.assertThat(this.engine.isRunning()).as("Engine should not fail due to an exception")).isTrue();
    }

    protected void validate(SourceRecord sourceRecord) {
        VerifyRecord.isValid(sourceRecord);
    }

    protected void print(SourceRecord sourceRecord) {
        VerifyRecord.print(sourceRecord);
    }

    protected void debug(SourceRecord sourceRecord) {
        VerifyRecord.debug(sourceRecord);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int i) {
        Assertions.assertThat(configValue(config, field.name()).errorMessages().size()).isEqualTo(i);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int i, int i2) {
        ConfigValue configValue = configValue(config, field.name());
        Assertions.assertThat(configValue.errorMessages().size()).isGreaterThanOrEqualTo(i);
        Assertions.assertThat(configValue.errorMessages().size()).isLessThanOrEqualTo(i2);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field) {
        Assertions.assertThat(configValue(config, field.name()).errorMessages().size()).isGreaterThan(0);
    }

    protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field... fieldArr) {
        for (io.debezium.config.Field field : fieldArr) {
            ConfigValue configValue = configValue(config, field.name());
            if (configValue != null && !configValue.errorMessages().isEmpty()) {
                Assert.fail("Error messages on field '" + field.name() + "': " + configValue.errorMessages());
            }
        }
    }

    protected ConfigValue configValue(Config config, String str) {
        return (ConfigValue) config.configValues().stream().filter(configValue -> {
            return configValue.name().equals(str);
        }).findFirst().orElse(null);
    }

    protected <T> Map<String, Object> readLastCommittedOffset(Configuration configuration, Map<String, T> map) {
        return readLastCommittedOffsets(configuration, Arrays.asList(map)).get(map);
    }

    protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(Configuration configuration, Collection<Map<String, T>> collection) {
        Configuration build = configuration.edit().with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        String string = build.getString(EmbeddedEngine.ENGINE_NAME);
        Converter converter = (Converter) build.getInstance(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS, Converter.class);
        converter.configure(build.subset(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        Converter converter2 = (Converter) build.getInstance(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS, Converter.class);
        Configuration configuration2 = build;
        if (converter2 instanceof JsonConverter) {
            configuration2 = build.edit().with(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();
        }
        converter2.configure(configuration2.subset(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
        Map asMap = build.asMap(EmbeddedEngine.ALL_FIELDS);
        asMap.put("key.converter", JsonConverter.class.getName());
        asMap.put("value.converter", JsonConverter.class.getName());
        EmbeddedEngine.EmbeddedConfig embeddedConfig = new EmbeddedEngine.EmbeddedConfig(asMap);
        FileOffsetBackingStore fileOffsetBackingStore = new FileOffsetBackingStore();
        fileOffsetBackingStore.configure(embeddedConfig);
        fileOffsetBackingStore.start();
        try {
            Map<Map<String, T>, Map<String, Object>> offsets = new OffsetStorageReaderImpl(fileOffsetBackingStore, string, converter, converter2).offsets(collection);
            fileOffsetBackingStore.stop();
            return offsets;
        } catch (Throwable th) {
            fileOffsetBackingStore.stop();
            throw th;
        }
    }
}
