package io.reactiverse.awssdk.integration.firehose;

import cloud.localstack.docker.LocalstackDocker;
import cloud.localstack.docker.LocalstackDockerExtension;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import io.reactiverse.awssdk.VertxSdkClient;
import io.reactiverse.awssdk.integration.LocalStackBaseSpec;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.reactivex.RxHelper;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
import software.amazon.awssdk.services.firehose.FirehoseClient;
import software.amazon.awssdk.services.firehose.model.DeliveryStreamDescription;
import software.amazon.awssdk.services.firehose.model.DeliveryStreamStatus;
import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordResponse;
import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;

@EnabledIfSystemProperty(named = "tests.integration", matches = "localstack")
@Extensions({@ExtendWith({VertxExtension.class}), @ExtendWith({LocalstackDockerExtension.class})})
@LocalstackDockerProperties(services = {"firehose", "s3"})
/* loaded from: input_file:io/reactiverse/awssdk/integration/firehose/VertxFirehoseClientSpec.class */
public class VertxFirehoseClientSpec extends LocalStackBaseSpec {
    private static final String STREAM = "My-Vertx-Firehose-Stream";
    private static final String BUCKET = "firehose-bucket";
    private FirehoseAsyncClient firehoseClient;
    private S3AsyncClient s3Client;
    private static final DeliveryStreamType STREAM_TYPE = DeliveryStreamType.DIRECT_PUT;
    private static final JsonObject FAKE_DATA = new JsonObject().put("producer", "vert.x");

    @BeforeAll
    public static void createStream() throws Exception {
        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        S3Client s3Client = (S3Client) S3Client.builder().credentialsProvider(credentialsProvider).region(Region.EU_WEST_1).endpointOverride(s3URI()).build();
        s3Client.createBucket(builder -> {
            builder.bucket(BUCKET).acl("public-read-write");
        });
        Assertions.assertTrue(s3Client.listObjects(builder2 -> {
            builder2.bucket(BUCKET);
        }).contents().isEmpty());
        FirehoseClient firehoseClient = (FirehoseClient) FirehoseClient.builder().region(Region.EU_WEST_1).credentialsProvider(credentialsProvider).endpointOverride(getFirehoseURI()).build();
        Assertions.assertNotNull(firehoseClient.createDeliveryStream(builder3 -> {
            builder3.deliveryStreamName(STREAM).deliveryStreamType(STREAM_TYPE).s3DestinationConfiguration(builder3 -> {
                builder3.bucketARN("arn:aws:s3:::firehose-bucket");
            });
        }));
        boolean z = false;
        while (!z) {
            Thread.sleep(1000L);
            DeliveryStreamDescription deliveryStreamDescription = firehoseClient.describeDeliveryStream(builder4 -> {
                builder4.deliveryStreamName(STREAM);
            }).deliveryStreamDescription();
            z = deliveryStreamDescription.deliveryStreamStatus().equals(DeliveryStreamStatus.ACTIVE);
            Assertions.assertEquals(STREAM_TYPE, deliveryStreamDescription.deliveryStreamType());
            Assertions.assertEquals(STREAM, deliveryStreamDescription.deliveryStreamName());
        }
    }

    @Timeout(value = 25, timeUnit = TimeUnit.SECONDS)
    @Test
    public void testPublish(Vertx vertx, VertxTestContext vertxTestContext) throws Exception {
        Context orCreateContext = vertx.getOrCreateContext();
        createS3Client(orCreateContext);
        createFirehoseClient(orCreateContext);
        Single flatMap = publishTestData().delay(5L, TimeUnit.SECONDS, RxHelper.scheduler(vertx)).flatMap(putRecordResponse -> {
            assertContext(vertx, orCreateContext, vertxTestContext);
            return lists3Files();
        });
        Consumer consumer = listObjectsResponse -> {
            assertContext(vertx, orCreateContext, vertxTestContext);
            List contents = listObjectsResponse.contents();
            vertxTestContext.verify(() -> {
                Assertions.assertEquals(1, contents.size());
                vertxTestContext.completeNow();
            });
        };
        vertxTestContext.getClass();
        flatMap.subscribe(consumer, vertxTestContext::failNow);
    }

    private Single<ListObjectsResponse> lists3Files() {
        return single(this.s3Client.listObjects(builder -> {
            builder.bucket(BUCKET);
        }));
    }

    private Single<ResponseBytes<GetObjectResponse>> getFile(String str, String str2) {
        LoggerFactory.getLogger(VertxFirehoseClientSpec.class).error("GETTING {} from S3", str2);
        return single(this.s3Client.getObject(builder -> {
            builder.bucket(str).key(str2).ifModifiedSince(Instant.now().minus(2L, (TemporalUnit) ChronoUnit.HOURS));
        }, AsyncResponseTransformer.toBytes()));
    }

    private Single<PutRecordResponse> publishTestData() {
        return single(this.firehoseClient.putRecord(VertxFirehoseClientSpec::testRecord));
    }

    private static PutRecordRequest.Builder testRecord(PutRecordRequest.Builder builder) {
        return builder.deliveryStreamName(STREAM).record((Record) Record.builder().data(SdkBytes.fromUtf8String(FAKE_DATA.encode())).build());
    }

    private void createS3Client(Context context) throws Exception {
        this.s3Client = s3(context);
    }

    private void createFirehoseClient(Context context) throws Exception {
        this.firehoseClient = (FirehoseAsyncClient) VertxSdkClient.withVertx(FirehoseAsyncClient.builder().region(Region.EU_WEST_1).credentialsProvider(credentialsProvider).endpointOverride(getFirehoseURI()), context).build();
    }

    private static URI getFirehoseURI() throws Exception {
        return new URI(LocalstackDocker.INSTANCE.getEndpointFirehose());
    }
}
