package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/embedded/EmbeddedEngineTest.class */
public class EmbeddedEngineTest extends AbstractConnectorTest {
    private static final int NUMBER_OF_LINES = 10;
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    private File inputFile;
    private int nextConsumedLineNumber;
    private int linesAdded;
    private Configuration connectorConfig;

    @Before
    public void beforeEach() throws Exception {
        this.nextConsumedLineNumber = 1;
        this.linesAdded = 0;
        Testing.Files.delete(TEST_FILE_PATH);
        this.inputFile = Testing.Files.createTestingFile(TEST_FILE_PATH);
        this.connectorConfig = Configuration.create().with("file", TEST_FILE_PATH).with("topic", "topicX").build();
    }

    @Test
    public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
        appendLinesToSource(NUMBER_OF_LINES);
        start(FileStreamSourceConnector.class, this.connectorConfig);
        consumeLines(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
        for (int i = 1; i != 5; i++) {
            appendLinesToSource(NUMBER_OF_LINES);
            consumeLines(NUMBER_OF_LINES);
            assertNoRecordsToConsume();
        }
        stopConnector();
        appendLinesToSource(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
        start(FileStreamSourceConnector.class, this.connectorConfig);
        consumeLines(NUMBER_OF_LINES);
        assertNoRecordsToConsume();
    }

    protected void appendLinesToSource(int i) throws IOException {
        CharSequence[] charSequenceArr = new CharSequence[i];
        for (int i2 = 0; i2 != i; i2++) {
            charSequenceArr[i2] = generateLine(this.linesAdded + i2 + 1);
        }
        Files.write(this.inputFile.toPath(), Collect.arrayListOf(charSequenceArr), UTF8, StandardOpenOption.APPEND);
        this.linesAdded += i;
    }

    protected String generateLine(int i) {
        return "Generated line number " + i;
    }

    protected void consumeLines(int i) throws InterruptedException {
        consumeRecords(i, sourceRecord -> {
            Assertions.assertThat(sourceRecord.value().toString()).isEqualTo(generateLine(this.nextConsumedLineNumber));
            this.nextConsumedLineNumber++;
        });
    }
}
