package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.function.BooleanConsumer;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/embedded/AbstractConnectorTest.class */
public abstract class AbstractConnectorTest implements Testing {
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
    private ExecutorService executor;
    private EmbeddedEngine engine;
    private BlockingQueue<SourceRecord> consumedLines;
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5);
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private CountDownLatch latch;

    @Before
    public final void initializeConnectorTestFramework() throws Exception {
        resetBeforeEachTest();
        this.consumedLines = new ArrayBlockingQueue(getMaximumEnqueuedRecordCount());
        Testing.Files.delete(OFFSET_STORE_PATH);
    }

    @After
    public final void stopConnector() {
        stopConnector(null);
    }

    public void stopConnector(BooleanConsumer booleanConsumer) {
        try {
            if (this.engine != null && this.engine.isRunning()) {
                this.engine.stop();
                try {
                    this.engine.await(5L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            if (this.executor != null) {
                Assertions.assertThat(this.executor.shutdownNow()).isEmpty();
                do {
                    try {
                    } catch (InterruptedException e2) {
                        Thread.interrupted();
                    }
                } while (!this.executor.awaitTermination(10L, TimeUnit.SECONDS));
            }
            if (this.engine != null && this.engine.isRunning()) {
                do {
                    try {
                    } catch (InterruptedException e3) {
                        Thread.interrupted();
                    }
                } while (!this.engine.await(5L, TimeUnit.SECONDS));
            }
            if (booleanConsumer != null) {
                booleanConsumer.accept(this.engine != null ? this.engine.isRunning() : false);
            }
        } finally {
            this.engine = null;
            this.executor = null;
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 100;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start(Class<? extends SourceConnector> cls, Configuration configuration) {
        start(cls, configuration, (z, str, th) -> {
            if (z) {
                this.logger.info(str);
            } else {
                this.logger.error(str, th);
            }
        });
    }

    protected void start(Class<? extends SourceConnector> cls, Configuration configuration, EmbeddedEngine.CompletionCallback completionCallback) {
        Configuration build = Configuration.copy(configuration).with(EmbeddedEngine.ENGINE_NAME, "testing-connector").with(EmbeddedEngine.CONNECTOR_CLASS, cls.getName()).with("offset.storage.file.filename", OFFSET_STORE_PATH).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0).build();
        this.latch = new CountDownLatch(1);
        EmbeddedEngine.CompletionCallback completionCallback2 = (z, str, th) -> {
            if (completionCallback != null) {
                try {
                    completionCallback.handle(z, str, th);
                } finally {
                    this.latch.countDown();
                }
            }
        };
        EmbeddedEngine.Builder using = EmbeddedEngine.create().using(build);
        BlockingQueue<SourceRecord> blockingQueue = this.consumedLines;
        blockingQueue.getClass();
        this.engine = using.notifying((v1) -> {
            r2.add(v1);
        }).using(getClass().getClassLoader()).using(completionCallback2).build();
        Assertions.assertThat(this.executor).isNull();
        this.executor = Executors.newFixedThreadPool(1);
        this.executor.execute(this.engine);
    }

    protected void setConsumeTimeout(long j, TimeUnit timeUnit) {
        if (j < 0) {
            throw new IllegalArgumentException("The timeout may not be negative");
        }
        this.pollTimeoutInMs = timeUnit.toMillis(j);
    }

    protected SourceRecord consumeRecord() throws InterruptedException {
        return this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
    }

    protected int consumeRecords(int i) throws InterruptedException {
        return consumeRecords(i, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int consumeRecords(int i, Consumer<SourceRecord> consumer) throws InterruptedException {
        int i2 = 0;
        for (int i3 = 0; i3 != i; i3++) {
            SourceRecord poll = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (poll != null) {
                i2++;
                if (consumer != null) {
                    consumer.accept(poll);
                }
            }
        }
        return i2;
    }

    protected int consumeAvailableRecords(Consumer<SourceRecord> consumer) {
        LinkedList linkedList = new LinkedList();
        this.consumedLines.drainTo(linkedList);
        if (consumer != null) {
            linkedList.forEach(consumer);
        }
        return linkedList.size();
    }

    protected boolean waitForAvailableRecords(long j, TimeUnit timeUnit) {
        Assertions.assertThat(j).isGreaterThanOrEqualTo(0L);
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        while (System.currentTimeMillis() < currentTimeMillis && this.consumedLines.isEmpty()) {
        }
        return !this.consumedLines.isEmpty();
    }

    protected void assertConnectorIsRunning() {
        Assertions.assertThat(this.engine.isRunning()).isTrue();
    }

    protected void assertConnectorNotRunning() {
        Assertions.assertThat(this.engine.isRunning()).isFalse();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertNoRecordsToConsume() {
        Assertions.assertThat(this.consumedLines.isEmpty()).isTrue();
    }

    protected void print(SourceRecord sourceRecord) {
        StringBuilder sb = new StringBuilder("SourceRecord{");
        sb.append("sourcePartition=").append(sourceRecord.sourcePartition());
        sb.append(", sourceOffset=").append(sourceRecord.sourceOffset());
        sb.append(", topic=").append(sourceRecord.topic());
        sb.append(", kafkaPartition=").append(sourceRecord.kafkaPartition());
        sb.append(", key=");
        append(sourceRecord.key(), sb);
        sb.append(", value=");
        append(sourceRecord.value(), sb);
        sb.append("}");
        Testing.print(sb.toString());
    }

    protected void append(Object obj, StringBuilder sb) {
        if (obj == null) {
            sb.append("null");
        } else if (obj instanceof Schema) {
            Schema schema = (Schema) obj;
            sb.append('{');
            sb.append("name=").append(schema.name());
            sb.append(", type=").append(schema.type());
            sb.append(", optional=").append(schema.isOptional());
            sb.append(", fields=");
            boolean z = true;
            for (Field field : schema.fields()) {
                if (z) {
                    z = false;
                } else {
                    sb.append(", ");
                }
                sb.append("name=").append(field.name());
                sb.append(", index=").append(field.index());
                sb.append(", schema=");
                append(field.schema(), sb);
            }
            sb.append('}');
        }
        if (obj instanceof Struct) {
            Struct struct = (Struct) obj;
            sb.append('{');
            boolean z2 = true;
            for (Field field2 : struct.schema().fields()) {
                if (z2) {
                    z2 = false;
                } else {
                    sb.append(", ");
                }
                sb.append(field2.name()).append('=');
                append(struct.get(field2), sb);
            }
            sb.append('}');
            return;
        }
        if (obj instanceof Map) {
            sb.append('{');
            boolean z3 = true;
            for (Map.Entry entry : ((Map) obj).entrySet()) {
                if (z3) {
                    z3 = false;
                } else {
                    sb.append(", ");
                }
                append(entry.getKey(), sb);
                sb.append('=');
                append(entry.getValue(), sb);
            }
            sb.append('}');
            return;
        }
        if (!(obj instanceof List)) {
            if (obj instanceof String) {
                sb.append('\"').append(obj.toString()).append('\"');
                return;
            } else {
                sb.append(obj.toString());
                return;
            }
        }
        sb.append('[');
        boolean z4 = true;
        for (Object obj2 : (List) obj) {
            if (z4) {
                z4 = false;
            } else {
                sb.append(", ");
            }
            append(obj2, sb);
        }
        sb.append(']');
    }
}
