package org.graylog.integrations.aws.service;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.assertj.core.api.AssertionsForClassTypes;
import org.graylog.integrations.aws.AWSClientBuilderUtil;
import org.graylog.integrations.aws.AWSLogMessage;
import org.graylog.integrations.aws.AWSMessageType;
import org.graylog.integrations.aws.AWSTestingUtils;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog.integrations.aws.resources.requests.AWSRequestImpl;
import org.graylog.integrations.aws.resources.requests.KinesisHealthCheckRequest;
import org.graylog.integrations.aws.resources.requests.KinesisNewStreamRequest;
import org.graylog.integrations.aws.resources.responses.KinesisHealthCheckResponse;
import org.graylog2.security.encryption.EncryptedValue;
import org.graylog2.shared.bindings.providers.ObjectMapperProvider;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.iam.IamClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
import software.amazon.awssdk.services.kinesis.model.ListStreamsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

/* loaded from: input_file:org/graylog/integrations/aws/service/KinesisServiceTest.class */
public class KinesisServiceTest {
    private static final String TEST_STREAM_1 = "test-stream-1";
    private static final String TEST_STREAM_2 = "test-stream-2";
    private static final String[] TWO_TEST_STREAMS = {TEST_STREAM_1, TEST_STREAM_2};
    private static final String TEST_REGION = Region.EU_WEST_1.id();
    private static final int SHARD_COUNT = 1;
    private static final String STREAM_ARN = "test-stream-arn";

    @Rule
    public MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    private KinesisClient kinesisClient;

    @Mock
    private AWSClientBuilderUtil awsClientBuilderUtil;

    @Mock
    private EncryptedValue encryptedValue;
    private KinesisService kinesisService;

    @Before
    public void setUp() {
        this.kinesisService = new KinesisService((IamClientBuilder) Mockito.mock(IamClientBuilder.class), (KinesisClientBuilder) Mockito.mock(KinesisClientBuilder.class), new ObjectMapperProvider().get(), AWSTestingUtils.buildTestCodecs(), this.awsClientBuilderUtil);
    }

    @Test
    public void testLogIdentification() {
        Assert.assertEquals(AWSMessageType.KINESIS_CLOUDWATCH_FLOW_LOGS, new AWSLogMessage("2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK").detectLogMessageType(true));
        Assert.assertEquals(AWSMessageType.KINESIS_CLOUDWATCH_FLOW_LOGS, new AWSLogMessage("2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 REJECT OK").detectLogMessageType(true));
        Assert.assertEquals(AWSMessageType.KINESIS_RAW, new AWSLogMessage("2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 REJECT OK ONE-MORE-WORD").detectLogMessageType(false));
        Assert.assertEquals(AWSMessageType.KINESIS_RAW, new AWSLogMessage("2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 REJECT").detectLogMessageType(false));
        Assert.assertEquals(AWSMessageType.KINESIS_RAW, new AWSLogMessage("haha this is not a real log message").detectLogMessageType(false));
    }

    @Test
    public void healthCheckCloudWatchFlowLog() throws ExecutionException, IOException {
        KinesisHealthCheckResponse executeHealthCheckTest = executeHealthCheckTest(AWSTestingUtils.cloudWatchFlowLogPayload(), Instant.now());
        Assert.assertEquals(AWSMessageType.KINESIS_CLOUDWATCH_FLOW_LOGS, executeHealthCheckTest.inputType());
        Map messageFields = executeHealthCheckTest.messageFields();
        Assert.assertEquals(AWSTestingUtils.CLOUD_WATCH_TIMESTAMP, messageFields.get("timestamp"));
        Assert.assertEquals(21L, messageFields.size());
        Assert.assertEquals(6, messageFields.get("protocol_number"));
        Assert.assertEquals("TCP", messageFields.get("protocol"));
        Assert.assertEquals(1L, messageFields.get("packets"));
        Assert.assertEquals("172.1.1.2", messageFields.get("dst_addr"));
    }

    @Test
    public void healthCheckCloudWatchRaw() throws ExecutionException, IOException {
        KinesisHealthCheckResponse executeHealthCheckTest = executeHealthCheckTest(AWSTestingUtils.cloudWatchRawPayload(), Instant.now());
        Assert.assertEquals(AWSMessageType.KINESIS_CLOUDWATCH_RAW, executeHealthCheckTest.inputType());
        Assert.assertEquals(AWSTestingUtils.CLOUD_WATCH_TIMESTAMP, executeHealthCheckTest.messageFields().get("timestamp"));
        Assert.assertEquals(7L, r0.size());
    }

    @Test
    public void healthCheckRawKinesisLog() throws ExecutionException, IOException {
        KinesisHealthCheckResponse executeHealthCheckTest = executeHealthCheckTest("This is a test raw log".getBytes(StandardCharsets.UTF_8), Instant.ofEpochMilli(new DateTime(2000, 1, 1, 1, 1, 1, DateTimeZone.UTC).getMillis()));
        Assert.assertEquals(AWSMessageType.KINESIS_RAW, executeHealthCheckTest.inputType());
        Assert.assertEquals(new DateTime("2000-01-01T01:01:01.000Z", DateTimeZone.UTC), executeHealthCheckTest.messageFields().get("timestamp"));
        Assert.assertEquals(5L, r0.size());
    }

    private KinesisHealthCheckResponse executeHealthCheckTest(byte[] bArr, Instant instant) throws IOException, ExecutionException {
        Mockito.when(this.awsClientBuilderUtil.buildClient((KinesisClientBuilder) ArgumentMatchers.any(KinesisClientBuilder.class), (AWSRequest) ArgumentMatchers.any())).thenReturn(this.kinesisClient);
        Mockito.when(this.kinesisClient.listStreams((ListStreamsRequest) ArgumentMatchers.isA(ListStreamsRequest.class))).thenReturn((ListStreamsResponse) ListStreamsResponse.builder().streamNames(TWO_TEST_STREAMS).hasMoreStreams(false).build());
        Mockito.when(this.kinesisClient.listShards((ListShardsRequest) ArgumentMatchers.isA(ListShardsRequest.class))).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{(Shard) Shard.builder().shardId("shardId-1234").build()}).build());
        Mockito.when(this.kinesisClient.getShardIterator((GetShardIteratorRequest) ArgumentMatchers.isA(GetShardIteratorRequest.class))).thenReturn((GetShardIteratorResponse) GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
        Record record = (Record) Record.builder().approximateArrivalTimestamp(instant).data(SdkBytes.fromByteArray(bArr)).build();
        Mockito.when(this.kinesisClient.getRecords((GetRecordsRequest) ArgumentMatchers.isA(GetRecordsRequest.class))).thenReturn((GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{record}).millisBehindLatest(10000L).build()).thenReturn((GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{record}).millisBehindLatest(0L).build());
        return this.kinesisService.healthCheck(((KinesisHealthCheckRequest.Builder) ((KinesisHealthCheckRequest.Builder) ((KinesisHealthCheckRequest.Builder) KinesisHealthCheckRequest.builder().region(Region.EU_WEST_1.id())).awsAccessKeyId("a-key")).awsSecretAccessKey(this.encryptedValue)).streamName(TEST_STREAM_1).build());
    }

    @Test
    public void testGetStreams() throws ExecutionException {
        Mockito.when(this.awsClientBuilderUtil.buildClient((KinesisClientBuilder) ArgumentMatchers.any(KinesisClientBuilder.class), (AWSRequest) ArgumentMatchers.any())).thenReturn(this.kinesisClient);
        Mockito.when(this.kinesisClient.listStreams((ListStreamsRequest) ArgumentMatchers.isA(ListStreamsRequest.class))).thenReturn((ListStreamsResponse) ListStreamsResponse.builder().streamNames(TWO_TEST_STREAMS).hasMoreStreams(false).build());
        Assert.assertEquals(2L, this.kinesisService.getKinesisStreamNames(((AWSRequestImpl.Builder) ((AWSRequestImpl.Builder) ((AWSRequestImpl.Builder) AWSRequestImpl.builder().region(TEST_REGION)).awsAccessKeyId("a-key")).awsSecretAccessKey(this.encryptedValue)).build()).total());
        Assert.assertEquals(2L, r0.streams().size());
        Mockito.when(this.kinesisClient.listStreams((ListStreamsRequest) ArgumentMatchers.isA(ListStreamsRequest.class))).thenReturn((ListStreamsResponse) ListStreamsResponse.builder().streamNames(TWO_TEST_STREAMS).hasMoreStreams(true).build()).thenReturn((ListStreamsResponse) ListStreamsResponse.builder().streamNames(TWO_TEST_STREAMS).hasMoreStreams(false).build());
        Assert.assertEquals(4L, this.kinesisService.getKinesisStreamNames(((AWSRequestImpl.Builder) ((AWSRequestImpl.Builder) ((AWSRequestImpl.Builder) AWSRequestImpl.builder().region(TEST_REGION)).awsAccessKeyId("a-key")).awsSecretAccessKey(this.encryptedValue)).build()).total());
        Assert.assertEquals(4L, r0.streams().size());
    }

    @Test
    public void testSelectRandomRecord() {
        ArrayList arrayList = new ArrayList();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            this.kinesisService.selectRandomRecord(arrayList);
        }).isExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining("Records list can not be empty.");
        arrayList.add((Record) Record.builder().build());
        arrayList.add((Record) Record.builder().build());
        arrayList.add((Record) Record.builder().build());
        Assert.assertNotNull(this.kinesisService.selectRandomRecord(arrayList));
    }

    @Test
    public void testRetrieveRecords() throws IOException {
        Mockito.when(this.kinesisClient.listShards((ListShardsRequest) ArgumentMatchers.isA(ListShardsRequest.class))).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{(Shard) Shard.builder().shardId("shardId-1234").build()}).build());
        Mockito.when(this.kinesisClient.getShardIterator((GetShardIteratorRequest) ArgumentMatchers.isA(GetShardIteratorRequest.class))).thenReturn((GetShardIteratorResponse) GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
        GetRecordsResponse getRecordsResponse = (GetRecordsResponse) GetRecordsResponse.builder().records(new Record[]{(Record) Record.builder().approximateArrivalTimestamp(Instant.now()).data(SdkBytes.fromByteArray(AWSTestingUtils.cloudWatchRawPayload())).build()}).millisBehindLatest(10000L).build();
        Mockito.when(this.kinesisClient.getRecords((GetRecordsRequest) ArgumentMatchers.isA(GetRecordsRequest.class))).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse).thenReturn(getRecordsResponse);
        Assert.assertEquals(this.kinesisService.retrieveRecords("kinesisStream", this.kinesisClient).size(), 10L);
    }

    @Test
    public void testCreateNewKinesisStream() {
        Mockito.when(this.awsClientBuilderUtil.buildClient((KinesisClientBuilder) ArgumentMatchers.any(KinesisClientBuilder.class), (AWSRequest) ArgumentMatchers.any())).thenReturn(this.kinesisClient);
        Mockito.when(this.kinesisClient.createStream((CreateStreamRequest) ArgumentMatchers.isA(CreateStreamRequest.class))).thenReturn((CreateStreamResponse) CreateStreamResponse.builder().build());
        Mockito.when(this.kinesisClient.describeStream((DescribeStreamRequest) ArgumentMatchers.isA(DescribeStreamRequest.class))).thenReturn((DescribeStreamResponse) DescribeStreamResponse.builder().streamDescription((StreamDescription) StreamDescription.builder().streamName(TEST_STREAM_1).streamStatus(StreamStatus.ACTIVE).streamARN(STREAM_ARN).build()).build());
        Assert.assertEquals("Success. The new stream [test-stream-1/test-stream-arn] was created with [1] shard.", this.kinesisService.createNewKinesisStream(((KinesisNewStreamRequest.Builder) ((KinesisNewStreamRequest.Builder) ((KinesisNewStreamRequest.Builder) KinesisNewStreamRequest.builder().region(Region.EU_WEST_1.id())).awsAccessKeyId("a-key")).awsSecretAccessKey(this.encryptedValue)).streamName(TEST_STREAM_1).build()).result());
        Assert.assertEquals(1L, 1L);
    }
}
