package io.debezium.connector.mysql.legacy;

import io.debezium.connector.mysql.legacy.ParallelSnapshotReader;
import io.debezium.connector.mysql.legacy.Reader;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/mysql/legacy/ParallelSnapshotReaderTest.class */
public class ParallelSnapshotReaderTest {
    @Test
    public void startStartsBothReaders() {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        SnapshotReader snapshotReader = (SnapshotReader) Mockito.mock(SnapshotReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, snapshotReader, (BinlogReader) Mockito.mock(BinlogReader.class));
        parallelSnapshotReader.start();
        Assert.assertSame(parallelSnapshotReader.state(), Reader.State.RUNNING);
        ((BinlogReader) Mockito.verify(binlogReader)).start();
        ((SnapshotReader) Mockito.verify(snapshotReader)).start();
    }

    @Test
    public void pollCombinesBothReadersPolls() throws InterruptedException {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        SnapshotReader snapshotReader = (SnapshotReader) Mockito.mock(SnapshotReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, snapshotReader, (BinlogReader) Mockito.mock(BinlogReader.class));
        SourceRecord sourceRecord = (SourceRecord) Mockito.mock(SourceRecord.class);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sourceRecord);
        SourceRecord sourceRecord2 = (SourceRecord) Mockito.mock(SourceRecord.class);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(sourceRecord2);
        Mockito.when(Boolean.valueOf(binlogReader.isRunning())).thenReturn(true);
        Mockito.when(binlogReader.poll()).thenReturn(arrayList);
        Mockito.when(snapshotReader.poll()).thenReturn(arrayList2);
        parallelSnapshotReader.start();
        List poll = parallelSnapshotReader.poll();
        Assert.assertEquals(2L, poll.size());
        Assert.assertTrue(poll.contains(sourceRecord));
        Assert.assertTrue(poll.contains(sourceRecord2));
    }

    @Test
    public void pollReturnsNewIfOldReaderIsStopped() throws InterruptedException {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        SnapshotReader snapshotReader = (SnapshotReader) Mockito.mock(SnapshotReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, snapshotReader, (BinlogReader) Mockito.mock(BinlogReader.class));
        SourceRecord sourceRecord = (SourceRecord) Mockito.mock(SourceRecord.class);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sourceRecord);
        Mockito.when(Boolean.valueOf(binlogReader.isRunning())).thenReturn(false);
        Mockito.when(binlogReader.poll()).thenThrow(new Throwable[]{new InterruptedException()});
        Mockito.when(snapshotReader.poll()).thenReturn(arrayList);
        parallelSnapshotReader.start();
        List poll = parallelSnapshotReader.poll();
        Assert.assertEquals(1L, poll.size());
        Assert.assertTrue(poll.contains(sourceRecord));
    }

    @Test
    public void pollReturnsOldIfNewReaderIsStopped() throws InterruptedException {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, (SnapshotReader) Mockito.mock(SnapshotReader.class), (BinlogReader) Mockito.mock(BinlogReader.class));
        SourceRecord sourceRecord = (SourceRecord) Mockito.mock(SourceRecord.class);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sourceRecord);
        Mockito.when(Boolean.valueOf(binlogReader.isRunning())).thenReturn(true);
        Mockito.when(binlogReader.poll()).thenReturn(arrayList);
        List poll = parallelSnapshotReader.poll();
        Assert.assertEquals(1L, poll.size());
        Assert.assertTrue(poll.contains(sourceRecord));
    }

    @Test
    public void pollReturnsNullIfBothReadersAreStopped() throws InterruptedException {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        SnapshotReader snapshotReader = (SnapshotReader) Mockito.mock(SnapshotReader.class);
        BinlogReader binlogReader2 = (BinlogReader) Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, snapshotReader, binlogReader2);
        Mockito.when(Boolean.valueOf(binlogReader.isRunning())).thenReturn(false);
        Mockito.when(binlogReader.poll()).thenThrow(new Throwable[]{new InterruptedException()});
        Mockito.when(binlogReader2.poll()).thenReturn((Object) null);
        Assert.assertEquals((Object) null, parallelSnapshotReader.poll());
    }

    @Test
    public void testStopStopsBothReaders() {
        BinlogReader binlogReader = (BinlogReader) Mockito.mock(BinlogReader.class);
        SnapshotReader snapshotReader = (SnapshotReader) Mockito.mock(SnapshotReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(binlogReader, snapshotReader, (BinlogReader) Mockito.mock(BinlogReader.class));
        parallelSnapshotReader.start();
        parallelSnapshotReader.stop();
        Assert.assertTrue(parallelSnapshotReader.state() == Reader.State.STOPPED);
        ((BinlogReader) Mockito.verify(binlogReader)).stop();
        ((SnapshotReader) Mockito.verify(snapshotReader)).stop();
    }

    @Test
    public void testHaltingPredicateHonorsTimeRange() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Duration ofMinutes = Duration.ofMinutes(5L);
        Assert.assertTrue(new ParallelSnapshotReader.ParallelHaltingPredicate(atomicBoolean, atomicBoolean2, ofMinutes).accepts(createSourceRecordWithTimestamp(Instant.now().minus((TemporalAmount) ofMinutes.multipliedBy(2L)))));
        Assert.assertFalse(atomicBoolean.get());
        Assert.assertFalse(atomicBoolean2.get());
    }

    @Test
    public void testHaltingPredicateFlipsthisReaderNearEnd() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        Assert.assertTrue(new ParallelSnapshotReader.ParallelHaltingPredicate(atomicBoolean, atomicBoolean2, Duration.ofMinutes(5L)).accepts(createSourceRecordWithTimestamp(Instant.now())));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertFalse(atomicBoolean2.get());
    }

    @Test
    public void testHaltingPredicateHalts() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        Assert.assertFalse(new ParallelSnapshotReader.ParallelHaltingPredicate(atomicBoolean, atomicBoolean2, Duration.ofMinutes(5L)).accepts(createSourceRecordWithTimestamp(Instant.now())));
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
    }

    private SourceRecord createSourceRecordWithTimestamp(Instant instant) {
        return new SourceRecord((Map) null, Collections.singletonMap("ts_sec", Long.valueOf(instant.getEpochSecond())), (String) null, (Schema) null, (Object) null);
    }
}
