package io.reactiverse.awssdk.integration.kinesis;

import cloud.localstack.docker.LocalstackDocker;
import cloud.localstack.docker.LocalstackDockerExtension;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import io.reactiverse.awssdk.VertxSdkClient;
import io.reactiverse.awssdk.integration.LocalStackBaseSpec;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

@EnabledIfSystemProperty(named = "tests.integration", matches = "localstack")
@Extensions({@ExtendWith({VertxExtension.class}), @ExtendWith({LocalstackDockerExtension.class})})
@LocalstackDockerProperties(services = {"kinesis"})
/* loaded from: input_file:io/reactiverse/awssdk/integration/kinesis/VertxKinesisClientSpec.class */
public class VertxKinesisClientSpec extends LocalStackBaseSpec {
    private static final String STREAM = "my-awesome-stream";
    private static final SdkBytes DATA = SdkBytes.fromByteArray("Hello".getBytes());
    private String currentShardIterator;
    private long pollTimer = -1;
    private long streamTimer = -1;

    @BeforeAll
    public static void createStream() throws Exception {
        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        KinesisClient kinesisClient = (KinesisClient) KinesisClient.builder().region(Region.EU_WEST_1).credentialsProvider(credentialsProvider).endpointOverride(new URI(LocalstackDocker.INSTANCE.getEndpointKinesis())).build();
        Assertions.assertNotNull(kinesisClient.createStream(builder -> {
            builder.streamName(STREAM).shardCount(1);
        }));
        boolean z = false;
        while (!z) {
            Thread.sleep(1000L);
            StreamDescription streamDescription = kinesisClient.describeStream(builder2 -> {
                builder2.streamName(STREAM);
            }).streamDescription();
            z = streamDescription.streamStatus().equals(StreamStatus.ACTIVE);
            if (z) {
                Assertions.assertEquals(1, streamDescription.shards().size());
            }
        }
    }

    @Timeout(value = 15, timeUnit = TimeUnit.SECONDS)
    @Test
    public void testPubSub(Vertx vertx, VertxTestContext vertxTestContext) throws Exception {
        Context orCreateContext = vertx.getOrCreateContext();
        KinesisAsyncClient kinesis = kinesis(orCreateContext);
        Single flatMap = single(kinesis.describeStream(this::streamDesc)).flatMap(describeStreamResponse -> {
            assertContext(vertx, orCreateContext, vertxTestContext);
            return single(kinesis.getShardIterator(shardIterator(((Shard) describeStreamResponse.streamDescription().shards().get(0)).shardId())));
        });
        Consumer consumer = getShardIteratorResponse -> {
            assertContext(vertx, orCreateContext, vertxTestContext);
            startPolling(vertx, vertxTestContext, kinesis, orCreateContext, getShardIteratorResponse.shardIterator());
            publishTestRecord(kinesis);
        };
        vertxTestContext.getClass();
        flatMap.subscribe(consumer, vertxTestContext::failNow);
    }

    private CompletableFuture<PutRecordResponse> publishTestRecord(KinesisAsyncClient kinesisAsyncClient) {
        return kinesisAsyncClient.putRecord(builder -> {
            builder.streamName(STREAM).partitionKey("Hello").data(DATA);
        });
    }

    private void startPolling(Vertx vertx, VertxTestContext vertxTestContext, KinesisAsyncClient kinesisAsyncClient, Context context, String str) {
        this.currentShardIterator = str;
        vertx.setPeriodic(1000L, l -> {
            this.pollTimer = l.longValue();
            kinesisAsyncClient.getRecords(builder -> {
                builder.shardIterator(this.currentShardIterator).limit(1);
            }).handle((getRecordsResponse, th) -> {
                if (th != null) {
                    vertxTestContext.failNow(th);
                    return null;
                }
                List records = getRecordsResponse.records();
                if (records.size() <= 0) {
                    this.currentShardIterator = getRecordsResponse.nextShardIterator();
                    return null;
                }
                vertxTestContext.verify(() -> {
                    Assertions.assertEquals(1, records.size());
                    Assertions.assertEquals(DATA, ((Record) records.get(0)).data());
                });
                if (this.pollTimer > -1) {
                    vertx.cancelTimer(this.pollTimer);
                }
                vertxTestContext.completeNow();
                return null;
            });
        });
    }

    private void streamDesc(DescribeStreamRequest.Builder builder) {
        builder.streamName(STREAM);
    }

    private java.util.function.Consumer<GetShardIteratorRequest.Builder> shardIterator(String str) {
        return builder -> {
            builder.streamName(STREAM).shardIteratorType(ShardIteratorType.LATEST).shardId(str);
        };
    }

    private KinesisAsyncClient kinesis(Context context) throws Exception {
        return (KinesisAsyncClient) VertxSdkClient.withVertx(KinesisAsyncClient.builder().region(Region.EU_WEST_1).endpointOverride(new URI(LocalstackDocker.INSTANCE.getEndpointKinesis())).credentialsProvider(credentialsProvider), context).build();
    }
}
