/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.function.BooleanConsumer;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConnectorTest
implements Testing {
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath((String)"file-connector-offsets.txt").toAbsolutePath();
    private ExecutorService executor;
    private EmbeddedEngine engine;
    private BlockingQueue<SourceRecord> consumedLines;
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5L);
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private CountDownLatch latch;
    private JsonConverter keyJsonConverter = new JsonConverter();
    private JsonConverter valueJsonConverter = new JsonConverter();
    private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
    private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();

    @Before
    public final void initializeConnectorTestFramework() {
        LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"test");
        this.keyJsonConverter = new JsonConverter();
        this.valueJsonConverter = new JsonConverter();
        this.keyJsonDeserializer = new JsonDeserializer();
        this.valueJsonDeserializer = new JsonDeserializer();
        Configuration converterConfig = Configuration.create().build();
        Configuration deserializerConfig = Configuration.create().build();
        this.keyJsonConverter.configure(converterConfig.asMap(), true);
        this.valueJsonConverter.configure(converterConfig.asMap(), false);
        this.keyJsonDeserializer.configure(deserializerConfig.asMap(), true);
        this.valueJsonDeserializer.configure(deserializerConfig.asMap(), false);
        this.resetBeforeEachTest();
        this.consumedLines = new ArrayBlockingQueue<SourceRecord>(this.getMaximumEnqueuedRecordCount());
        Testing.Files.delete((Path)OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public final void stopConnector() {
        this.stopConnector(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopConnector(BooleanConsumer callback) {
        try {
            this.logger.info("Stopping the connector");
            if (this.engine != null && this.engine.isRunning()) {
                this.engine.stop();
                try {
                    this.engine.await(8L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (this.executor != null) {
                List<Runnable> neverRunTasks = this.executor.shutdownNow();
                Assertions.assertThat(neverRunTasks).isEmpty();
                try {
                    while (!this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (this.engine != null && this.engine.isRunning()) {
                try {
                    while (!this.engine.await(5L, TimeUnit.SECONDS)) {
                    }
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (callback != null) {
                callback.accept(this.engine != null ? this.engine.isRunning() : false);
            }
        }
        finally {
            this.engine = null;
            this.executor = null;
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 100;
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig) {
        this.start(connectorClass, connectorConfig, (success, msg, error) -> {
            if (success) {
                this.logger.info(msg);
            } else {
                this.logger.error(msg, error);
            }
        });
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, EmbeddedEngine.CompletionCallback callback) {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.copy((Configuration)connectorConfig).with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName())).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)).build();
        this.latch = new CountDownLatch(1);
        EmbeddedEngine.CompletionCallback wrapperCallback = (success, msg, error) -> {
            try {
                if (callback != null) {
                    callback.handle(success, msg, error);
                }
            }
            finally {
                this.latch.countDown();
            }
            Testing.debug((Object)"Stopped connector");
        };
        this.engine = EmbeddedEngine.create().using(config).notifying(record -> {
            try {
                this.consumedLines.put((SourceRecord)record);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
        }).using(this.getClass().getClassLoader()).using(wrapperCallback).build();
        Assertions.assertThat((Object)this.executor).isNull();
        this.executor = Executors.newFixedThreadPool(1);
        this.executor.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
    }

    protected void setConsumeTimeout(long timeout, TimeUnit unit) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("The timeout may not be negative");
        }
        this.pollTimeoutInMs = unit.toMillis(timeout);
    }

    protected SourceRecord consumeRecord() throws InterruptedException {
        return this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
    }

    protected int consumeRecords(int numberOfRecords) throws InterruptedException {
        return this.consumeRecords(numberOfRecords, null);
    }

    protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordConsumer) throws InterruptedException {
        int recordsConsumed = 0;
        while (recordsConsumed < numberOfRecords) {
            SourceRecord record = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (record != null) {
                ++recordsConsumed;
                if (recordConsumer != null) {
                    recordConsumer.accept(record);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more)"));
                    this.debug(record);
                    continue;
                }
                if (!Testing.Print.isEnabled()) continue;
                Testing.print((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more)"));
                this.print(record);
                continue;
            }
            return recordsConsumed;
        }
        return recordsConsumed;
    }

    protected SourceRecords consumeRecordsByTopic(int numRecords) throws InterruptedException {
        SourceRecords records = new SourceRecords();
        this.consumeRecords(numRecords, records::add);
        return records;
    }

    protected int consumeAvailableRecords(Consumer<SourceRecord> recordConsumer) {
        LinkedList records = new LinkedList();
        this.consumedLines.drainTo(records);
        if (recordConsumer != null) {
            records.forEach(recordConsumer);
        }
        return records.size();
    }

    protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) {
        Assertions.assertThat((long)timeout).isGreaterThanOrEqualTo(0L);
        long now = System.currentTimeMillis();
        long stop = now + unit.toMillis(timeout);
        while (System.currentTimeMillis() < stop && this.consumedLines.isEmpty()) {
        }
        return !this.consumedLines.isEmpty();
    }

    protected void assertConnectorIsRunning() {
        Assertions.assertThat((boolean)this.engine.isRunning()).isTrue();
    }

    protected void assertConnectorNotRunning() {
        Assertions.assertThat((boolean)this.engine.isRunning()).isFalse();
    }

    protected void assertNoRecordsToConsume() {
        Assertions.assertThat((boolean)this.consumedLines.isEmpty()).isTrue();
    }

    protected void assertKey(SourceRecord record, String pkField, int pk) {
        VerifyRecord.hasValidKey((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertInsert(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidInsert((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertUpdate(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidUpdate((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertDelete(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidDelete((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertTombstone(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidTombstone((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertTombstone(SourceRecord record) {
        VerifyRecord.isValidTombstone((SourceRecord)record);
    }

    protected void assertSchemaMatchesStruct(SchemaAndValue value) {
        VerifyRecord.schemaMatchesStruct((SchemaAndValue)value);
    }

    protected void assertSchemaMatchesStruct(Struct struct, Schema schema) {
        VerifyRecord.schemaMatchesStruct((Struct)struct, (Schema)schema);
    }

    protected void validate(SourceRecord record) {
        VerifyRecord.isValid((SourceRecord)record);
    }

    protected void print(SourceRecord record) {
        VerifyRecord.print((SourceRecord)record);
    }

    protected void debug(SourceRecord record) {
        VerifyRecord.debug((SourceRecord)record);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int numErrors) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isEqualTo(numErrors);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int minErrorsInclusive, int maxErrorsInclusive) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isGreaterThanOrEqualTo(minErrorsInclusive);
        Assertions.assertThat((int)value.errorMessages().size()).isLessThanOrEqualTo(maxErrorsInclusive);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isGreaterThan(0);
    }

    protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field ... fields) {
        for (io.debezium.config.Field field : fields) {
            ConfigValue value = this.configValue(config, field.name());
            if (value == null || value.errorMessages().isEmpty()) continue;
            Assert.fail((String)("Error messages on field '" + field.name() + "': " + value.errorMessages()));
        }
    }

    protected ConfigValue configValue(Config config, String fieldName) {
        return config.configValues().stream().filter(value -> value.name().equals(fieldName)).findFirst().orElse(null);
    }

    protected class SourceRecords {
        private final List<SourceRecord> records = new ArrayList<SourceRecord>();
        private final Map<String, List<SourceRecord>> recordsByTopic = new HashMap<String, List<SourceRecord>>();
        private final Map<String, List<SourceRecord>> ddlRecordsByDbName = new HashMap<String, List<SourceRecord>>();

        protected SourceRecords() {
        }

        public void add(SourceRecord record) {
            this.records.add(record);
            this.recordsByTopic.computeIfAbsent(record.topic(), topicName -> new ArrayList()).add(record);
            String dbName = this.getAffectedDatabase(record);
            if (dbName != null) {
                this.ddlRecordsByDbName.computeIfAbsent(dbName, key -> new ArrayList()).add(record);
            }
        }

        protected String getAffectedDatabase(SourceRecord record) {
            Field dbField;
            Struct value = (Struct)record.value();
            if (value != null && (dbField = value.schema().field("databaseName")) != null) {
                return value.getString(dbField.name());
            }
            return null;
        }

        public List<SourceRecord> ddlRecordsForDatabase(String dbName) {
            return this.ddlRecordsByDbName.get(dbName);
        }

        public Set<String> databaseNames() {
            return this.ddlRecordsByDbName.keySet();
        }

        public List<SourceRecord> recordsForTopic(String topicName) {
            return this.recordsByTopic.get(topicName);
        }

        public Set<String> topics() {
            return this.recordsByTopic.keySet();
        }

        public void forEachInTopic(String topic, Consumer<SourceRecord> consumer) {
            this.recordsForTopic(topic).forEach(consumer);
        }

        public void forEach(Consumer<SourceRecord> consumer) {
            this.records.forEach(consumer);
        }

        public List<SourceRecord> allRecordsInOrder() {
            return Collections.unmodifiableList(this.records);
        }

        public void print() {
            Testing.print((Object)("" + this.topics().size() + " topics: " + this.topics()));
            this.recordsByTopic.forEach((? super K k, ? super V v) -> Testing.print((Object)(" - topic:'" + k + "'; # of events = " + v.size())));
            Testing.print((Object)"Records:");
            this.records.forEach((? super T record) -> AbstractConnectorTest.this.print((SourceRecord)record));
        }
    }
}

