package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import jakarta.inject.Provider;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.graylog.plugins.pipelineprocessor.BaseParserTest;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigDto;
import org.graylog.plugins.pipelineprocessor.db.RuleMetricsConfigService;
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbPipelineService;
import org.graylog.plugins.pipelineprocessor.db.mongodb.MongoDbPipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.functions.conversion.LongConversion;
import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog.plugins.pipelineprocessor.processors.ConfigurationStateUpdater;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog.plugins.pipelineprocessor.rulebuilder.RuleBuilder;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.TestMessageFactory;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.SuppressForbidden;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/graylog/plugins/pipelineprocessor/functions/messages/MessageCreationLoopPreventionTest.class */
public class MessageCreationLoopPreventionTest extends BaseParserTest {
    private MessageFactory messageFactory = new TestMessageFactory();
    final CloneMessage cloneMessage = (CloneMessage) Mockito.spy(new CloneMessage(this.messageFactory));
    PipelineInterpreter pipelineInterpreter;

    @Before
    @SuppressForbidden("Allow using default thread factory")
    public void createPipelineInterpreter() {
        RuleService ruleService = (RuleService) Mockito.mock(RuleService.class);
        Mockito.when(ruleService.loadAll()).thenReturn(Collections.singleton(RuleDao.create("r1", "title", "description", ruleForTest(), Tools.nowUTC(), (DateTime) null, (RuleBuilder) null, (String) null)));
        PipelineService pipelineService = (PipelineService) Mockito.mock(MongoDbPipelineService.class);
        Mockito.when(pipelineService.loadAll()).thenReturn(Collections.singleton(PipelineDao.create("p1", "title", "description", "pipeline \"pipeline\"\nstage 0 match all\n    rule \"test rule\";\nend\n", Tools.nowUTC(), (DateTime) null)));
        MessageQueueAcknowledger messageQueueAcknowledger = (MessageQueueAcknowledger) Mockito.mock(MessageQueueAcknowledger.class);
        EventBus eventBus = (EventBus) Mockito.mock(EventBus.class);
        RuleMetricsConfigService ruleMetricsConfigService = (RuleMetricsConfigService) Mockito.mock(RuleMetricsConfigService.class);
        Mockito.when(ruleMetricsConfigService.get()).thenReturn(RuleMetricsConfigDto.createDefault());
        PipelineStreamConnectionsService pipelineStreamConnectionsService = (PipelineStreamConnectionsService) Mockito.mock(MongoDbPipelineStreamConnectionsService.class);
        Mockito.when(pipelineStreamConnectionsService.loadAll()).thenReturn(Collections.singleton(PipelineConnections.create("p1", defaultStream.getId(), (Set) pipelineService.loadAll().stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet()))));
        Stream stream = (Stream) Mockito.mock(Stream.class, "Other Stream");
        Mockito.when(stream.isPaused()).thenReturn(false);
        Mockito.when(stream.getTitle()).thenReturn("other stream");
        Mockito.when(stream.getId()).thenReturn("Other Stream");
        StreamService streamService = (StreamService) Mockito.mock(StreamService.class);
        Mockito.when(streamService.loadAll()).thenReturn(Lists.newArrayList(new Stream[]{defaultStream, stream}));
        Mockito.when(streamService.loadAllEnabled()).thenReturn(Lists.newArrayList(new Stream[]{defaultStream, stream}));
        StreamCacheService streamCacheService = new StreamCacheService(eventBus, streamService, (ScheduledExecutorService) null);
        streamCacheService.startAsync().awaitRunning();
        Provider provider = () -> {
            return defaultStream;
        };
        this.pipelineInterpreter = new PipelineInterpreter(messageQueueAcknowledger, new MetricRegistry(), new ConfigurationStateUpdater(ruleService, pipelineService, pipelineStreamConnectionsService, new PipelineRuleParser(new FunctionRegistry(ImmutableMap.of("clone_message", this.cloneMessage, "has_field", new HasField(), "set_field", new SetField(), "to_long", new LongConversion(), "remove_from_stream", new RemoveFromStream(streamCacheService, provider), "route_to_stream", new RouteToStream(streamCacheService, provider)))), ruleMetricsConfigService, new MetricRegistry(), Executors.newScheduledThreadPool(1), eventBus, (immutableMap, immutableSetMultimap, ruleMetricsConfigDto) -> {
            return new PipelineInterpreter.State(immutableMap, immutableSetMultimap, ruleMetricsConfigDto, new MetricRegistry(), 1, true);
        }));
    }

    @Test
    public void loopPreventionBasic() {
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream()), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(101))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(101L, messageArr.length);
    }

    @Test
    public void loopPreventionParam() {
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream()), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(2))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(2L, messageArr.length);
    }

    @Test
    public void loopPreventionWorkaround1() {
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream()), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(1))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(2L, messageArr.length);
    }

    @Test
    public void loopPreventionWorkaround2() {
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream()), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(2))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(3L, messageArr.length);
    }

    @Test
    public void loopPreventionRecursive() {
        Message messageInDefaultStream = messageInDefaultStream();
        messageInDefaultStream.addField("cycle", 5);
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(5))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(6L, messageArr.length);
    }

    @Test
    public void loopPreventionRecursiveFail() {
        Message messageInDefaultStream = messageInDefaultStream();
        messageInDefaultStream.addField("cycle", 110);
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(101))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(101L, messageArr.length);
    }

    @Test
    public void loopPreventionRecursiveParam() {
        Message messageInDefaultStream = messageInDefaultStream();
        messageInDefaultStream.addField("cycle", 110);
        Message[] messageArr = (Message[]) Iterables.toArray(this.pipelineInterpreter.process(messageInDefaultStream), Message.class);
        ((CloneMessage) Mockito.verify(this.cloneMessage, Mockito.times(110))).evaluate((FunctionArgs) ArgumentMatchers.any(), (EvaluationContext) ArgumentMatchers.any());
        Assert.assertEquals(111L, messageArr.length);
    }

    private Message messageInDefaultStream() {
        Message createMessage = this.messageFactory.createMessage("original message", "test", Tools.nowUTC());
        createMessage.addStream(defaultStream);
        return createMessage;
    }
}
