package org.graylog2.outputs;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import org.graylog.testing.completebackend.apis.GraylogRestApi;
import org.graylog2.Configuration;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.MessageWithIndex;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.shared.SuppressForbidden;
import org.graylog2.shared.journal.NoopJournal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/graylog2/outputs/BlockingBatchedESOutputTest.class */
public class BlockingBatchedESOutputTest {
    private Configuration config;

    @Mock
    private Messages messages;

    @Mock
    private MessageQueueAcknowledger acknowledger;

    @Mock
    private Cluster cluster;
    private BlockingBatchedESOutput output;

    @BeforeEach
    @SuppressForbidden("Using Executors.newSingleThreadExecutor() is okay in tests")
    public void setUp() throws Exception {
        MetricRegistry metricRegistry = new MetricRegistry();
        NoopJournal noopJournal = new NoopJournal();
        this.config = new Configuration() { // from class: org.graylog2.outputs.BlockingBatchedESOutputTest.1
            public int getOutputBatchSize() {
                return 3;
            }

            public int getShutdownTimeout() {
                if ("true".equals(System.getenv("CI"))) {
                    return GraylogRestApi.SLEEP_MS;
                }
                return 100;
            }
        };
        this.output = new BlockingBatchedESOutput(metricRegistry, this.messages, this.config, noopJournal, this.acknowledger, this.cluster, Executors.newSingleThreadScheduledExecutor());
        this.output.initialize();
    }

    @AfterEach
    public void tearDown() {
        this.output.stop();
    }

    @Test
    public void write() throws Exception {
        ((Messages) Mockito.verify(this.messages, Mockito.times(1))).bulkIndex((List) ArgumentMatchers.eq(sendMessages(this.output, this.config.getOutputBatchSize())));
    }

    @Test
    public void forceFlushIfTimedOut() throws Exception {
        List<MessageWithIndex> sendMessages = sendMessages(this.output, this.config.getOutputBatchSize() - 1);
        this.output.forceFlushIfTimedout();
        ((Messages) Mockito.verify(this.messages, Mockito.times(1))).bulkIndex((List) ArgumentMatchers.eq(sendMessages));
    }

    @Test
    public void flushWithService() throws Exception {
        List<MessageWithIndex> sendMessages = sendMessages(this.output, this.config.getOutputBatchSize() - 1);
        Thread.sleep((this.config.getOutputFlushInterval() * 1000) + 100);
        ((Messages) Mockito.verify(this.messages, Mockito.times(1))).bulkIndex((List) ArgumentMatchers.eq(sendMessages));
    }

    @Test
    public void stop_withHealthyCluster() throws Exception {
        Mockito.when(Boolean.valueOf(this.cluster.isConnected())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.cluster.isDeflectorHealthy())).thenReturn(true);
        List<MessageWithIndex> sendMessages = sendMessages(this.output, this.config.getOutputBatchSize() - 1);
        this.output.stop();
        ((Messages) Mockito.verify(this.messages, Mockito.times(1))).bulkIndex((List) ArgumentMatchers.eq(sendMessages));
    }

    @Timeout(1)
    @Test
    public void stop_withDisconnectedCluster() throws Exception {
        Mockito.when(Boolean.valueOf(this.cluster.isConnected())).thenReturn(false);
        sendMessages(this.output, this.config.getOutputBatchSize() - 1);
        this.output.stop();
        Mockito.verifyNoInteractions(new Object[]{this.messages});
    }

    @Timeout(1)
    @Test
    public void stop_withIndexingBlocked() throws Exception {
        Mockito.when(Boolean.valueOf(this.cluster.isConnected())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.cluster.isDeflectorHealthy())).thenReturn(true);
        Mockito.when(this.messages.bulkIndex((List) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            new CountDownLatch(1).await();
            return null;
        });
        List<MessageWithIndex> sendMessages = sendMessages(this.output, this.config.getOutputBatchSize() - 1);
        this.output.stop();
        ((Messages) Mockito.verify(this.messages, Mockito.times(1))).bulkIndex((List) ArgumentMatchers.eq(sendMessages));
    }

    private List<MessageWithIndex> buildMessages(int i) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i2 = 0; i2 < i; i2++) {
            builder.add(new MessageWithIndex(new Message("message" + i2, "test", Tools.nowUTC()), (IndexSet) Mockito.mock(IndexSet.class)));
        }
        return builder.build();
    }

    private List<MessageWithIndex> sendMessages(BlockingBatchedESOutput blockingBatchedESOutput, int i) throws Exception {
        List<MessageWithIndex> buildMessages = buildMessages(i);
        Iterator<MessageWithIndex> it = buildMessages.iterator();
        while (it.hasNext()) {
            blockingBatchedESOutput.writeMessageEntry(it.next());
        }
        return buildMessages;
    }
}
