package org.graylog.failure;

import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.graylog2.Configuration;
import org.graylog2.indexer.messages.Indexable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/graylog/failure/FailureSubmissionQueueTest.class */
public class FailureSubmissionQueueTest {
    private final Configuration configuration = (Configuration) Mockito.mock(Configuration.class);
    private final MetricRegistry metricRegistry = new MetricRegistry();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("failure-scheduled-%d").setDaemon(false).build());

    @Test
    public void submitBlocking_whenQueueNotFull_acceptsNewBatches() throws Exception {
        Mockito.when(Integer.valueOf(this.configuration.getFailureHandlingQueueCapacity())).thenReturn(1000);
        FailureSubmissionQueue failureSubmissionQueue = new FailureSubmissionQueue(this.configuration, this.metricRegistry);
        ProcessingFailure createProcessingFailure = createProcessingFailure();
        ProcessingFailure createProcessingFailure2 = createProcessingFailure();
        failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure));
        failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure2));
        Assertions.assertThat(failureSubmissionQueue.queueSize()).isEqualTo(2);
        Assertions.assertThat(failureSubmissionQueue.consumeBlocking()).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure));
        Assertions.assertThat(failureSubmissionQueue.consumeBlocking()).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure2));
    }

    @Test
    public void submitBlocking_whenQueueIsFull_submissionIsBlocked() throws Exception {
        Mockito.when(Integer.valueOf(this.configuration.getFailureHandlingQueueCapacity())).thenReturn(2);
        FailureSubmissionQueue failureSubmissionQueue = new FailureSubmissionQueue(this.configuration, this.metricRegistry);
        ProcessingFailure createProcessingFailure = createProcessingFailure();
        ProcessingFailure createProcessingFailure2 = createProcessingFailure();
        ProcessingFailure createProcessingFailure3 = createProcessingFailure();
        failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure));
        failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure2));
        this.scheduler.schedule(() -> {
            Assertions.assertThat(failureSubmissionQueue.queueSize()).isEqualTo(2);
            try {
                Assertions.assertThat(failureSubmissionQueue.consumeBlocking()).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 300L, TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis();
        failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure3));
        Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).isGreaterThan(200L);
        Assertions.assertThat(failureSubmissionQueue.queueSize()).isEqualTo(2);
        Assertions.assertThat(failureSubmissionQueue.consumeBlocking()).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure2));
        Assertions.assertThat(failureSubmissionQueue.consumeBlocking()).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure3));
    }

    @Test
    public void consumeBlocking_waitsForBatch_whenQueueIsEmpty() throws Exception {
        Mockito.when(Integer.valueOf(this.configuration.getFailureHandlingQueueCapacity())).thenReturn(2);
        FailureSubmissionQueue failureSubmissionQueue = new FailureSubmissionQueue(this.configuration, this.metricRegistry);
        ProcessingFailure createProcessingFailure = createProcessingFailure();
        this.scheduler.schedule(() -> {
            try {
                failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 300L, TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis();
        FailureBatch consumeBlocking = failureSubmissionQueue.consumeBlocking();
        Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).isGreaterThan(200L);
        Assertions.assertThat(consumeBlocking).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure));
    }

    @Test
    public void consumeBlockingWithTimeout_returnsBatch_whenSubmittedWithinWaitingTimeout() throws Exception {
        Mockito.when(Integer.valueOf(this.configuration.getFailureHandlingQueueCapacity())).thenReturn(2);
        FailureSubmissionQueue failureSubmissionQueue = new FailureSubmissionQueue(this.configuration, this.metricRegistry);
        ProcessingFailure createProcessingFailure = createProcessingFailure();
        this.scheduler.schedule(() -> {
            try {
                failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 300L, TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis();
        FailureBatch consumeBlockingWithTimeout = failureSubmissionQueue.consumeBlockingWithTimeout(500L);
        Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).isGreaterThan(200L);
        Assertions.assertThat(consumeBlockingWithTimeout).isEqualTo(FailureBatch.processingFailureBatch(createProcessingFailure));
    }

    @Test
    public void consumeBlockingWithTimeout_returnsNull_whenReachedWaitingTimeout() throws Exception {
        Mockito.when(Integer.valueOf(this.configuration.getFailureHandlingQueueCapacity())).thenReturn(2);
        FailureSubmissionQueue failureSubmissionQueue = new FailureSubmissionQueue(this.configuration, this.metricRegistry);
        ProcessingFailure createProcessingFailure = createProcessingFailure();
        this.scheduler.schedule(() -> {
            try {
                failureSubmissionQueue.submitBlocking(FailureBatch.processingFailureBatch(createProcessingFailure));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 300L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(failureSubmissionQueue.consumeBlockingWithTimeout(50L)).isNull();
    }

    private ProcessingFailure createProcessingFailure() {
        return new ProcessingFailure(ProcessingFailureCause.UNKNOWN, "message", "details", DateTime.now(DateTimeZone.UTC), (Indexable) null, true);
    }
}
