package io.debezium.connector.mysql;

import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.mysql.ChainedReader;
import io.debezium.connector.mysql.Reader;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mysql/ChainedReaderTest.class */
public class ChainedReaderTest {
    private static final List<SourceRecord> RL1 = Collect.arrayListOf(record(), new SourceRecord[0]);
    private static final List<SourceRecord> RL2 = Collect.arrayListOf(record(), new SourceRecord[0]);
    private static final List<SourceRecord> RL3 = Collect.arrayListOf(record(), new SourceRecord[0]);
    private static final List<SourceRecord> RL4 = Collect.arrayListOf(record(), new SourceRecord[0]);
    private static final List<SourceRecord> RL5 = Collect.arrayListOf(record(), new SourceRecord[0]);
    private static final List<List<SourceRecord>> SOURCE_RECORDS = Collect.arrayListOf(RL1, new List[]{RL2, RL3, RL4, RL5});
    private ChainedReader reader;

    /* loaded from: input_file:io/debezium/connector/mysql/ChainedReaderTest$CompletingMockReader.class */
    public static class CompletingMockReader extends MockReader {
        public CompletingMockReader(String str, Supplier<List<SourceRecord>> supplier) {
            super(str, supplier);
        }

        @Override // io.debezium.connector.mysql.ChainedReaderTest.MockReader
        protected boolean continueReturningRecordsFromPolling() {
            return true;
        }
    }

    /* loaded from: input_file:io/debezium/connector/mysql/ChainedReaderTest$MockReader.class */
    public static class MockReader implements Reader {
        private final String name;
        private final Supplier<List<SourceRecord>> pollResultsSupplier;
        private final AtomicReference<Runnable> completionHandler = new AtomicReference<>();
        private final AtomicBoolean running = new AtomicBoolean();
        private final AtomicBoolean completed = new AtomicBoolean();

        public MockReader(String str, Supplier<List<SourceRecord>> supplier) {
            this.name = str;
            this.pollResultsSupplier = supplier;
        }

        public Reader.State state() {
            return this.running.get() ? Reader.State.RUNNING : this.completed.get() ? Reader.State.STOPPED : Reader.State.STOPPING;
        }

        public String name() {
            return this.name;
        }

        public List<SourceRecord> poll() throws InterruptedException {
            List<SourceRecord> list = null;
            if (continueReturningRecordsFromPolling()) {
                list = this.pollResultsSupplier.get();
            }
            if (list == null) {
                Runnable runnable = this.completionHandler.get();
                if (runnable != null) {
                    runnable.run();
                }
                this.completed.set(true);
                this.running.set(false);
            }
            return list;
        }

        protected boolean continueReturningRecordsFromPolling() {
            return this.running.get();
        }

        public void start() {
            Assertions.assertThat(this.running.get()).isFalse();
            this.running.set(true);
        }

        public void stop() {
            this.running.set(false);
        }

        public void uponCompletion(Runnable runnable) {
            this.completionHandler.set(runnable);
        }
    }

    protected static Supplier<List<SourceRecord>> records() {
        Iterator<List<SourceRecord>> it = SOURCE_RECORDS.iterator();
        return () -> {
            if (it.hasNext()) {
                return (List) it.next();
            }
            return null;
        };
    }

    private static SourceRecord record() {
        return new SourceRecord((Map) null, (Map) null, (String) null, (Integer) null, (Schema) null, (Object) null);
    }

    @Test
    public void shouldNotStartWithoutReaders() throws InterruptedException {
        this.reader = new ChainedReader.Builder().build();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        this.reader.start();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws InterruptedException {
        this.reader = new ChainedReader.Builder().addReader(new MockReader("r1", records())).completionMessage("Stopped the r1 reader").build();
        this.reader.start();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.RUNNING);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL4);
        this.reader.stop();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPING);
        Assertions.assertThat(this.reader.poll()).isNull();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedException {
        this.reader = new ChainedReader.Builder().addReader(new MockReader("r2", records())).completionMessage("Stopped the r2 reader").build();
        this.reader.start();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.RUNNING);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat(this.reader.poll()).isNull();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopMultipleReaders() throws InterruptedException {
        this.reader = new ChainedReader.Builder().addReader(new MockReader("r3", records())).addReader(new MockReader("r4", records())).completionMessage("Stopped the r3+r4 reader").build();
        this.reader.start();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.RUNNING);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL5);
        List poll = this.reader.poll();
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, ConfigurationDefaults.RETURN_CONTROL_INTERVAL);
        while (poll == null) {
            if (timer.expired()) {
                Assert.fail("Subsequent reader has not started");
            }
            Thread.sleep(100L);
            poll = this.reader.poll();
        }
        Assertions.assertThat(poll).isSameAs(RL1);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat(this.reader.poll()).isNull();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingStopped() throws InterruptedException {
        this.reader = new ChainedReader.Builder().addReader(new CompletingMockReader("r5", records())).completionMessage("Stopped the r5 reader").build();
        this.reader.start();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.RUNNING);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL2);
        this.reader.stop();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPING);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat(this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat(this.reader.poll()).isNull();
        Assertions.assertThat(this.reader.state()).isEqualTo(Reader.State.STOPPED);
        assertPollReturnsNoMoreRecords();
    }

    protected void assertPollReturnsNoMoreRecords() throws InterruptedException {
        for (int i = 0; i != 10; i++) {
            Assertions.assertThat(this.reader.poll()).isNull();
        }
    }
}
