package org.graylog2.indexer.messages;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import joptsimple.internal.Strings;
import org.assertj.core.api.Assertions;
import org.graylog.testing.elasticsearch.ElasticsearchBaseTest;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.TestIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategy;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategyConfig;
import org.graylog2.indexer.rotation.strategies.MessageCountRotationStrategy;
import org.graylog2.indexer.rotation.strategies.MessageCountRotationStrategyConfig;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/graylog2/indexer/messages/MessagesIT.class */
public abstract class MessagesIT extends ElasticsearchBaseTest {
    private static final String INDEX_NAME = "messages_it_deflector";
    protected Messages messages;
    private static final IndexSetConfig indexSetConfig = IndexSetConfig.builder().id("index-set-1").title("Index set 1").description("For testing").indexPrefix("messages_it").creationDate(ZonedDateTime.now(ZoneOffset.UTC)).shards(1).replicas(0).rotationStrategyClass(MessageCountRotationStrategy.class.getCanonicalName()).rotationStrategy(MessageCountRotationStrategyConfig.createDefault()).retentionStrategyClass(DeletionRetentionStrategy.class.getCanonicalName()).retentionStrategy(DeletionRetentionStrategyConfig.createDefault()).indexAnalyzer("standard").indexTemplateName("template-1").indexOptimizationMaxNumSegments(1).indexOptimizationDisabled(false).build();
    private static final IndexSet indexSet = new TestIndexSet(indexSetConfig);

    protected abstract MessagesAdapter createMessagesAdapter(MetricRegistry metricRegistry);

    @Before
    public void setUp() throws Exception {
        client().deleteIndices(INDEX_NAME);
        client().createIndex(INDEX_NAME);
        client().waitForGreenStatus(INDEX_NAME);
        this.messages = new Messages((TrafficAccounting) Mockito.mock(TrafficAccounting.class), createMessagesAdapter(new MetricRegistry()), (ProcessingStatusRecorder) Mockito.mock(ProcessingStatusRecorder.class));
    }

    @After
    public void tearDown() {
        client().cleanUp();
    }

    protected abstract boolean indexMessage(String str, Map<String, Object> map, String str2);

    @Test
    public void getRetrievesPreviouslyStoredMessage() throws Exception {
        String uuid = UUID.randomUUID().toString();
        client().createIndex(uuid);
        HashMap hashMap = new HashMap();
        hashMap.put("message", "This is my message");
        hashMap.put("source", "logsender");
        hashMap.put("timestamp", "2017-04-13 15:29:00.000");
        Assertions.assertThat(indexMessage(uuid, hashMap, "1")).isTrue();
        Message message = this.messages.get("1", uuid).getMessage();
        Assertions.assertThat(message).isNotNull();
        Assertions.assertThat(message.getMessage()).isEqualTo("This is my message");
        Assertions.assertThat(message.getSource()).isEqualTo("logsender");
        Assertions.assertThat(message.getTimestamp()).isEqualTo(DateTime.parse("2017-04-13T15:29:00.000Z"));
    }

    @Test
    public void analyzingFieldReturnsTokens() throws IOException {
        Assertions.assertThat(this.messages.analyze("The quick brown fox jumps over the lazy dog", client().createRandomIndex("analyze-"), "standard")).containsExactlyInAnyOrder(new String[]{"the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"});
    }

    @Test
    public void testIfTooLargeBatchesGetSplitUp() throws Exception {
        Assertions.assertThat(this.messages.bulkIndex(createMessageBatch(1048576, 101))).isEmpty();
        Thread.sleep(2000L);
        Assertions.assertThat(messageCount(INDEX_NAME)).isEqualTo(101L);
    }

    protected abstract long messageCount(String str);

    @Test
    public void unevenTooLargeBatchesGetSplitUp() throws Exception {
        ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch = createMessageBatch(1024, 100);
        createMessageBatch.addAll(createMessageBatch(5242880, 20));
        Assertions.assertThat(this.messages.bulkIndex(createMessageBatch)).isEmpty();
        client().refreshNode();
        Assertions.assertThat(messageCount(INDEX_NAME)).isEqualTo(120L);
    }

    @Test
    public void conflictingFieldTypesErrorAreReported() throws Exception {
        Message message = new Message("One message", "loghost-a", now());
        message.addField("_ourcustomfield", 42);
        Message message2 = new Message("Another message", "loghost-b", now());
        message2.addField("_ourcustomfield", "fourty-two");
        Assertions.assertThat(this.messages.bulkIndex(ImmutableList.of(entry(indexSet, message), entry(indexSet, message2)))).hasSize(1);
        Assertions.assertThat((List) this.messages.getIndexFailureQueue().poll(1L, TimeUnit.SECONDS)).hasSize(1);
    }

    @Test
    public void retryIndexingMessagesDuringFloodStage() throws Exception {
        triggerFloodStage(INDEX_NAME);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch = createMessageBatch(1024, 50);
        Future<List<String>> background = background(() -> {
            return this.messages.bulkIndex(createMessageBatch, createIndexingListener(countDownLatch, atomicBoolean));
        });
        countDownLatch.await();
        resetFloodStage(INDEX_NAME);
        Assertions.assertThat(background.get(3L, TimeUnit.MINUTES)).isEmpty();
        client().refreshNode();
        Assertions.assertThat(messageCount(INDEX_NAME)).isEqualTo(50L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    private Messages.IndexingListener createIndexingListener(final CountDownLatch countDownLatch, final AtomicBoolean atomicBoolean) {
        return new Messages.IndexingListener() { // from class: org.graylog2.indexer.messages.MessagesIT.1
            public void onRetry(long j) {
                countDownLatch.countDown();
            }

            public void onSuccess(long j) {
                if (countDownLatch.getCount() > 0) {
                    countDownLatch.countDown();
                }
                atomicBoolean.set(true);
            }
        };
    }

    @Test
    public void retryIndexingMessagesIfTargetAliasIsInvalid() throws Exception {
        String createRandomIndex = client().createRandomIndex("multiple_targets");
        String createRandomIndex2 = client().createRandomIndex("multiple_targets");
        client().deleteIndices(INDEX_NAME);
        client().addAliasMapping(createRandomIndex, INDEX_NAME);
        client().addAliasMapping(createRandomIndex2, INDEX_NAME);
        ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch = createMessageBatch(1024, 50);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Future<List<String>> background = background(() -> {
            return this.messages.bulkIndex(createMessageBatch, createIndexingListener(countDownLatch, atomicBoolean));
        });
        countDownLatch.await();
        client().removeAliasMapping(createRandomIndex2, INDEX_NAME);
        Assertions.assertThat(background.get(3L, TimeUnit.MINUTES)).isEmpty();
        client().refreshNode();
        Assertions.assertThat(messageCount(INDEX_NAME)).isEqualTo(50L);
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    public void properlySerializesCustomObjectsInMessageField() throws IOException {
        Message message = new Message("Some message", "somesource", now());
        message.addField("custom_object", new TextNode("foo"));
        Assertions.assertThat(this.messages.bulkIndex(ImmutableList.of(Maps.immutableEntry(indexSet, message)))).isEmpty();
        client().refreshNode();
        Assertions.assertThat(this.messages.get(message.getId(), INDEX_NAME).getMessage().getField("custom_object")).isEqualTo("foo");
    }

    private Future<List<String>> background(Callable<List<String>> callable) {
        return Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("messages-it-%d").build()).submit(callable);
    }

    private void triggerFloodStage(String str) {
        client().putSetting("cluster.routing.allocation.disk.watermark.low", "0%");
        client().putSetting("cluster.routing.allocation.disk.watermark.high", "0%");
        client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "0%");
        client().waitForIndexBlock(str);
    }

    private void resetFloodStage(String str) {
        client().putSetting("cluster.routing.allocation.disk.watermark.flood_stage", "95%");
        client().putSetting("cluster.routing.allocation.disk.watermark.high", "90%");
        client().putSetting("cluster.routing.allocation.disk.watermark.low", "85%");
        client().resetIndexBlock(str);
    }

    private Map.Entry<IndexSet, Message> entry(IndexSet indexSet2, Message message) {
        return new AbstractMap.SimpleEntry(indexSet2, message);
    }

    private DateTime now() {
        return DateTime.now(DateTimeZone.UTC);
    }

    private ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch(int i, int i2) {
        ArrayList<Map.Entry<IndexSet, Message>> arrayList = new ArrayList<>();
        String repeat = Strings.repeat('A', i);
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.add(Maps.immutableEntry(indexSet, new Message(i3 + repeat, "source", now())));
        }
        return arrayList;
    }
}
