package io.debezium.server.kinesis;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.inject.Instance;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/kinesis/KinesisUnitTest.class */
public class KinesisUnitTest {
    private KinesisChangeConsumer kinesisChangeConsumer;
    private KinesisClient spyClient;
    private AtomicInteger counter;
    private AtomicBoolean threwException;
    List<ChangeEvent<Object, Object>> changeEvents;
    DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer;

    @BeforeEach
    public void setup() {
        this.counter = new AtomicInteger(0);
        this.threwException = new AtomicBoolean(false);
        this.changeEvents = createChangeEvents(500, "key", "destination");
        this.committer = RecordCommitter();
        this.spyClient = (KinesisClient) Mockito.spy((KinesisClient) KinesisClient.builder().region(Region.of(KinesisTestConfigSource.KINESIS_REGION)).credentialsProvider(ProfileCredentialsProvider.create("default")).build());
        Instance instance = (Instance) Mockito.mock(Instance.class);
        Mockito.when(Boolean.valueOf(instance.isResolvable())).thenReturn(true);
        Mockito.when((KinesisClient) instance.get()).thenReturn(this.spyClient);
        this.kinesisChangeConsumer = new KinesisChangeConsumer();
        this.kinesisChangeConsumer.customClient = instance;
    }

    @AfterEach
    public void tearDown() {
        Mockito.reset(new KinesisClient[]{this.spyClient});
    }

    private static List<ChangeEvent<Object, Object>> createChangeEvents(int i, String str, String str2) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            ChangeEvent changeEvent = (ChangeEvent) Mockito.mock(ChangeEvent.class);
            Mockito.when(changeEvent.key()).thenReturn(str);
            Mockito.when(changeEvent.value()).thenReturn(Integer.toString(i2));
            Mockito.when(changeEvent.destination()).thenReturn(str2);
            Header header = (Header) Mockito.mock(Header.class);
            Mockito.when(header.getKey()).thenReturn(str);
            Mockito.when(header.getValue()).thenReturn(Integer.toString(i2));
            Mockito.when(changeEvent.headers()).thenReturn(List.of(header));
            arrayList.add(changeEvent);
        }
        return arrayList;
    }

    private static DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> RecordCommitter() {
        return (DebeziumEngine.RecordCommitter) Mockito.mock(DebeziumEngine.RecordCommitter.class);
    }

    @Test
    public void testValidResponseWithErrorCode() throws Exception {
        ((KinesisClient) Mockito.doAnswer(invocationOnMock -> {
            List records = ((PutRecordsRequest) invocationOnMock.getArgument(0)).records();
            this.counter.incrementAndGet();
            return PutRecordsResponse.builder().failedRecordCount(Integer.valueOf(records.size())).records((List) records.stream().map(putRecordsRequestEntry -> {
                return (PutRecordsResultEntry) PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException").errorMessage("The request rate for the stream is too high").build();
            }).collect(Collectors.toList())).build();
        }).when(this.spyClient)).putRecords((PutRecordsRequest) ArgumentMatchers.any(PutRecordsRequest.class));
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(this.changeEvents, RecordCommitter());
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertTrue(this.threwException.get());
        Assertions.assertEquals(5, this.counter.get());
    }

    @Test
    public void testExceptionWhileWritingData() throws Exception {
        ((KinesisClient) Mockito.doAnswer(invocationOnMock -> {
            this.counter.incrementAndGet();
            throw KinesisException.builder().message("Kinesis Exception").build();
        }).when(this.spyClient)).putRecords((PutRecordsRequest) ArgumentMatchers.any(PutRecordsRequest.class));
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(this.changeEvents, this.committer);
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertTrue(this.threwException.get());
        Assertions.assertEquals(5, this.counter.get());
    }

    @Test
    public void testResendFailedRecords() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ((KinesisClient) Mockito.doAnswer(invocationOnMock -> {
            PutRecordsResultEntry putRecordsResultEntry;
            ArrayList arrayList3 = new ArrayList();
            List records = ((PutRecordsRequest) invocationOnMock.getArgument(0)).records();
            this.counter.incrementAndGet();
            if (!atomicBoolean.get()) {
                Iterator it = records.iterator();
                while (it.hasNext()) {
                    arrayList2.add((PutRecordsRequestEntry) it.next());
                    arrayList3.add((PutRecordsResultEntry) PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build());
                }
                return PutRecordsResponse.builder().failedRecordCount(0).records(arrayList3).build();
            }
            for (int i = 0; i < records.size(); i++) {
                if (i < 100) {
                    putRecordsResultEntry = (PutRecordsResultEntry) PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException").errorMessage("The request rate for the stream is too high").build();
                    arrayList.add((PutRecordsRequestEntry) records.get(i));
                } else {
                    putRecordsResultEntry = (PutRecordsResultEntry) PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build();
                }
                arrayList3.add(putRecordsResultEntry);
            }
            atomicBoolean.getAndSet(false);
            return PutRecordsResponse.builder().failedRecordCount(100).records(arrayList3).build();
        }).when(this.spyClient)).putRecords((PutRecordsRequest) ArgumentMatchers.any(PutRecordsRequest.class));
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(this.changeEvents, this.committer);
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertFalse(this.threwException.get());
        Assertions.assertEquals(2, this.counter.get());
        Assertions.assertEquals(arrayList2.size(), arrayList.size());
        for (int i = 0; i < arrayList2.size(); i++) {
            Assertions.assertEquals(((PutRecordsRequestEntry) arrayList.get(i)).data(), ((PutRecordsRequestEntry) arrayList2.get(i)).data());
        }
    }

    @Test
    public void testBatchesAreCorrect() throws Exception {
        new ArrayList();
        String str = "dest1";
        String str2 = "dest2";
        List<ChangeEvent<Object, Object>> createChangeEvents = createChangeEvents(600, "dest1", "dest1");
        createChangeEvents.addAll(createChangeEvents(600, "dest2", "dest2"));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        ((KinesisClient) Mockito.doAnswer(invocationOnMock -> {
            ArrayList arrayList = new ArrayList();
            for (PutRecordsRequestEntry putRecordsRequestEntry : ((PutRecordsRequest) invocationOnMock.getArgument(0)).records()) {
                if (putRecordsRequestEntry.partitionKey().equals(str)) {
                    atomicInteger.incrementAndGet();
                } else if (putRecordsRequestEntry.partitionKey().equals(str2)) {
                    atomicInteger2.incrementAndGet();
                }
                arrayList.add((PutRecordsResultEntry) PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build());
            }
            atomicInteger3.incrementAndGet();
            return PutRecordsResponse.builder().failedRecordCount(0).records(arrayList).build();
        }).when(this.spyClient)).putRecords((PutRecordsRequest) ArgumentMatchers.any(PutRecordsRequest.class));
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(createChangeEvents, this.committer);
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertFalse(this.threwException.get());
        Assertions.assertEquals(600, atomicInteger.get());
        Assertions.assertEquals(600, atomicInteger2.get());
        Assertions.assertEquals(4, atomicInteger3.get());
    }

    @Test
    public void testEmptyRecords() throws Exception {
        ArrayList arrayList = new ArrayList();
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(arrayList, this.committer);
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertFalse(this.threwException.get());
    }

    @Test
    public void testBatchSplitting() throws Exception {
        List<ChangeEvent<Object, Object>> createChangeEvents = createChangeEvents(1000, "key", "destination");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ((KinesisClient) Mockito.doAnswer(invocationOnMock -> {
            ArrayList arrayList = new ArrayList();
            for (PutRecordsRequestEntry putRecordsRequestEntry : ((PutRecordsRequest) invocationOnMock.getArgument(0)).records()) {
                if (atomicBoolean.get()) {
                    atomicInteger2.incrementAndGet();
                } else {
                    atomicInteger3.incrementAndGet();
                }
                arrayList.add((PutRecordsResultEntry) PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build());
            }
            atomicInteger.incrementAndGet();
            atomicBoolean.getAndSet(false);
            return PutRecordsResponse.builder().failedRecordCount(0).records(arrayList).build();
        }).when(this.spyClient)).putRecords((PutRecordsRequest) ArgumentMatchers.any(PutRecordsRequest.class));
        try {
            this.kinesisChangeConsumer.connect();
            this.kinesisChangeConsumer.handleBatch(createChangeEvents, this.committer);
        } catch (Exception e) {
            this.threwException.getAndSet(true);
        }
        Assertions.assertFalse(this.threwException.get());
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(500, atomicInteger2.get());
        Assertions.assertEquals(500, atomicInteger3.get());
    }
}
