package org.graylog.plugins.pipelineprocessor.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
import org.graylog.plugins.pipelineprocessor.ast.Stage;
import org.graylog.plugins.pipelineprocessor.ast.statements.Statement;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.events.PipelineConnectionsChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.PipelinesChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.RulesChangedEvent;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog.plugins.pipelineprocessor.processors.listeners.InterpreterListener;
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageCollection;
import org.graylog2.plugin.Messages;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.graylog2.shared.journal.Journal;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.class */
public class PipelineInterpreter implements MessageProcessor {
    private static final Logger log = LoggerFactory.getLogger(PipelineInterpreter.class);
    public static final String GL2_PROCESSING_ERROR = "gl2_processing_error";
    private final RuleService ruleService;
    private final PipelineService pipelineService;
    private final PipelineStreamConnectionsService pipelineStreamConnectionsService;
    private final PipelineRuleParser pipelineRuleParser;
    private final Journal journal;
    private final MetricRegistry metricRegistry;
    private final ScheduledExecutorService scheduler;
    private final Meter filteredOutMessages;
    private EventBus serverEventBus;
    private final AtomicReference<ImmutableMap<String, Pipeline>> currentPipelines = new AtomicReference<>(ImmutableMap.of());
    private final AtomicReference<ImmutableSetMultimap<String, Pipeline>> streamPipelineConnections = new AtomicReference<>(ImmutableSetMultimap.of());

    /* loaded from: input_file:org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter$Descriptor.class */
    public static class Descriptor implements MessageProcessor.Descriptor {
        public String name() {
            return "Pipeline Processor";
        }

        public String className() {
            return PipelineInterpreter.class.getCanonicalName();
        }
    }

    @Inject
    public PipelineInterpreter(RuleService ruleService, PipelineService pipelineService, PipelineStreamConnectionsService pipelineStreamConnectionsService, PipelineRuleParser pipelineRuleParser, Journal journal, MetricRegistry metricRegistry, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService, EventBus eventBus) {
        this.ruleService = ruleService;
        this.pipelineService = pipelineService;
        this.pipelineStreamConnectionsService = pipelineStreamConnectionsService;
        this.pipelineRuleParser = pipelineRuleParser;
        this.journal = journal;
        this.metricRegistry = metricRegistry;
        this.scheduler = scheduledExecutorService;
        this.filteredOutMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, new String[]{"filteredOutMessages"}));
        this.serverEventBus = eventBus;
        eventBus.register(this);
        reload();
    }

    public void stop() {
        this.serverEventBus.unregister(this);
    }

    private synchronized void reload() {
        Pipeline empty;
        Rule alwaysFalse;
        HashMap newHashMap = Maps.newHashMap();
        for (RuleDao ruleDao : this.ruleService.loadAll()) {
            try {
                alwaysFalse = this.pipelineRuleParser.parseRule(ruleDao.id(), ruleDao.source(), false);
            } catch (ParseException e) {
                alwaysFalse = Rule.alwaysFalse("Failed to parse rule: " + ruleDao.id());
            }
            newHashMap.put(alwaysFalse.name(), alwaysFalse);
        }
        HashMap newHashMap2 = Maps.newHashMap();
        for (PipelineDao pipelineDao : this.pipelineService.loadAll()) {
            try {
                empty = this.pipelineRuleParser.parsePipeline(pipelineDao.id(), pipelineDao.source());
            } catch (ParseException e2) {
                empty = Pipeline.empty("Failed to parse pipeline" + pipelineDao.id());
            }
            newHashMap2.put(pipelineDao.id(), empty);
        }
        newHashMap2.values().stream().flatMap(pipeline -> {
            log.debug("Resolving pipeline {}", pipeline.name());
            return pipeline.stages().stream();
        }).forEach(stage -> {
            stage.setRules((List) stage.ruleReferences().stream().map(str -> {
                Rule rule = (Rule) newHashMap.get(str);
                if (rule == null) {
                    rule = Rule.alwaysFalse("Unresolved rule " + str);
                }
                log.debug("Resolved rule `{}` to {}", str, rule);
                return rule;
            }).collect(Collectors.toList()));
        });
        this.currentPipelines.set(ImmutableMap.copyOf(newHashMap2));
        HashMultimap create = HashMultimap.create();
        for (PipelineConnections pipelineConnections : this.pipelineStreamConnectionsService.loadAll()) {
            Stream<String> stream = pipelineConnections.pipelineIds().stream();
            newHashMap2.getClass();
            stream.map((v1) -> {
                return r1.get(v1);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(pipeline2 -> {
                create.put(pipelineConnections.streamId(), pipeline2);
            });
        }
        this.streamPipelineConnections.set(ImmutableSetMultimap.copyOf(create));
    }

    public Messages process(Messages messages) {
        return process(messages, new NoopInterpreterListener());
    }

    public Messages process(Messages messages, InterpreterListener interpreterListener) {
        Set copyOf;
        interpreterListener.startProcessing();
        HashSet newHashSet = Sets.newHashSet();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList(messages);
        while (!newArrayList2.isEmpty()) {
            MessageCollection messageCollection = new MessageCollection(newArrayList2);
            newArrayList2.clear();
            Iterator it = messageCollection.iterator();
            while (it.hasNext()) {
                Message message = (Message) it.next();
                String id = message.getId();
                Set set = (Set) message.getStreams().stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.toSet());
                ImmutableSetMultimap<String, Pipeline> immutableSetMultimap = this.streamPipelineConnections.get();
                if (!set.isEmpty()) {
                    Stream filter = set.stream().filter(str -> {
                        return !newHashSet.contains(Tuple.tuple(id, str));
                    });
                    immutableSetMultimap.getClass();
                    Set<String> set2 = (Set) filter.filter((v1) -> {
                        return r1.containsKey(v1);
                    }).collect(Collectors.toSet());
                    copyOf = ImmutableSet.copyOf((Collection) set2.stream().flatMap(str2 -> {
                        return immutableSetMultimap.get(str2).stream();
                    }).collect(Collectors.toSet()));
                    interpreterListener.processStreams(message, copyOf, set2);
                    log.debug("[{}] running pipelines {} for streams {}", new Object[]{id, copyOf, set2});
                } else if (newHashSet.contains(Tuple.tuple(id, "default"))) {
                    copyOf = ImmutableSet.of();
                    log.debug("[{}] already processed default stream, skipping", id);
                } else {
                    copyOf = immutableSetMultimap.get("default");
                    interpreterListener.processDefaultStream(message, copyOf);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] running default stream pipelines: [{}]", id, copyOf.stream().map((v0) -> {
                            return v0.name();
                        }).toArray());
                    }
                }
                newArrayList2.addAll(processForPipelines(message, id, (Set) copyOf.stream().map((v0) -> {
                    return v0.id();
                }).collect(Collectors.toSet()), interpreterListener));
                boolean z = false;
                for (org.graylog2.plugin.streams.Stream stream : message.getStreams()) {
                    if (set.remove(stream.getId())) {
                        newHashSet.add(Tuple.tuple(id, stream.getId()));
                    } else {
                        z = true;
                    }
                }
                if (message.getFilterOut()) {
                    log.debug("[{}] marked message to be discarded. Dropping message.", id);
                    this.filteredOutMessages.mark();
                    this.journal.markJournalOffsetCommitted(message.getJournalOffset());
                }
                if (!z || message.getFilterOut()) {
                    log.debug("[{}] no new streams matches or dropped message, not running again", id);
                    newArrayList.add(message);
                } else {
                    log.debug("[{}] new streams assigned, running again for those streams", id);
                    newArrayList2.add(message);
                }
            }
        }
        interpreterListener.finishProcessing();
        return new MessageCollection(newArrayList);
    }

    public List<Message> processForPipelines(Message message, String str, Set<String> set, InterpreterListener interpreterListener) {
        return processForResolvedPipelines(message, str, ImmutableSet.copyOf((Collection) set.stream().map(str2 -> {
            return (Pipeline) this.currentPipelines.get().get(str2);
        }).filter(pipeline -> {
            return pipeline != null;
        }).collect(Collectors.toSet())), interpreterListener);
    }

    public List<Message> processForResolvedPipelines(Message message, String str, Set<Pipeline> set, InterpreterListener interpreterListener) {
        ArrayList arrayList = new ArrayList();
        set.forEach(pipeline -> {
            this.metricRegistry.counter(MetricRegistry.name(Pipeline.class, new String[]{pipeline.id(), "executed"})).inc();
        });
        StageIterator stageIterator = new StageIterator(set);
        HashSet newHashSet = Sets.newHashSet();
        while (stageIterator.hasNext()) {
            for (Tuple2 tuple2 : (Set) stageIterator.next()) {
                Stage stage = (Stage) tuple2.v1();
                Pipeline pipeline2 = (Pipeline) tuple2.v2();
                if (newHashSet.contains(pipeline2)) {
                    log.debug("[{}] previous stage result prevents further processing of pipeline `{}`", str, pipeline2.name());
                } else {
                    this.metricRegistry.counter(MetricRegistry.name(Pipeline.class, new String[]{pipeline2.id(), "stage", String.valueOf(stage.stage()), "executed"})).inc();
                    interpreterListener.enterStage(stage);
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = str;
                    objArr[1] = Integer.valueOf(stage.stage());
                    objArr[2] = stage.matchAll() ? "all" : "either";
                    logger.debug("[{}] evaluating rule conditions in stage {}: match {}", objArr);
                    EvaluationContext evaluationContext = new EvaluationContext(message);
                    ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(stage.getRules().size());
                    boolean z = false;
                    for (Rule rule : stage.getRules()) {
                        interpreterListener.evaluateRule(rule, pipeline2);
                        if (rule.when().evaluateBool(evaluationContext)) {
                            z = true;
                            countRuleExecution(rule, pipeline2, stage, "matched");
                            if (evaluationContext.hasEvaluationErrors()) {
                                EvaluationContext.EvalError evalError = (EvaluationContext.EvalError) Iterables.getLast(evaluationContext.evaluationErrors());
                                appendProcessingError(rule, message, evalError.toString());
                                interpreterListener.failEvaluateRule(rule, pipeline2);
                                log.debug("Encountered evaluation error during condition, skipping rule actions: {}", evalError);
                            } else {
                                interpreterListener.satisfyRule(rule, pipeline2);
                                log.debug("[{}] rule `{}` matches, scheduling to run", str, rule.name());
                                newArrayListWithCapacity.add(rule);
                            }
                        } else {
                            countRuleExecution(rule, pipeline2, stage, "not-matched");
                            interpreterListener.dissatisfyRule(rule, pipeline2);
                            log.debug("[{}] rule `{}` does not match", str, rule.name());
                        }
                    }
                    Iterator it = newArrayListWithCapacity.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Rule rule2 = (Rule) it.next();
                        countRuleExecution(rule2, pipeline2, stage, "executed");
                        interpreterListener.executeRule(rule2, pipeline2);
                        log.debug("[{}] rule `{}` matched running actions", str, rule2.name());
                        Iterator<Statement> it2 = rule2.then().iterator();
                        while (it2.hasNext()) {
                            it2.next().evaluate(evaluationContext);
                            if (evaluationContext.hasEvaluationErrors()) {
                                EvaluationContext.EvalError evalError2 = (EvaluationContext.EvalError) Iterables.getLast(evaluationContext.evaluationErrors());
                                appendProcessingError(rule2, message, evalError2.toString());
                                interpreterListener.failExecuteRule(rule2, pipeline2);
                                log.debug("Encountered evaluation error, skipping rest of the rule: {}", evalError2);
                                countRuleExecution(rule2, pipeline2, stage, "failed");
                                break;
                            }
                        }
                    }
                    if (!(stage.matchAll() && newArrayListWithCapacity.size() == stage.getRules().size()) && (newArrayListWithCapacity.size() <= 0 || !z)) {
                        interpreterListener.stopPipelineExecution(pipeline2, stage);
                        Logger logger2 = log;
                        Object[] objArr2 = new Object[4];
                        objArr2[0] = str;
                        objArr2[1] = Integer.valueOf(stage.stage());
                        objArr2[2] = pipeline2.name();
                        objArr2[3] = stage.matchAll() ? "all" : "either";
                        logger2.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage", objArr2);
                        newHashSet.add(pipeline2);
                    } else {
                        interpreterListener.continuePipelineExecution(pipeline2, stage);
                        Logger logger3 = log;
                        Object[] objArr3 = new Object[4];
                        objArr3[0] = str;
                        objArr3[1] = Integer.valueOf(stage.stage());
                        objArr3[2] = pipeline2.name();
                        objArr3[3] = stage.matchAll() ? "all" : "either";
                        logger3.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage", objArr3);
                    }
                    Iterables.addAll(arrayList, evaluationContext.createdMessages());
                    evaluationContext.clearCreatedMessages();
                    interpreterListener.exitStage(stage);
                }
            }
        }
        return arrayList;
    }

    private void countRuleExecution(Rule rule, Pipeline pipeline, Stage stage, String str) {
        this.metricRegistry.counter(MetricRegistry.name(Rule.class, new String[]{rule.id(), str})).inc();
        this.metricRegistry.counter(MetricRegistry.name(Rule.class, new String[]{rule.id(), pipeline.id(), String.valueOf(stage.stage()), str})).inc();
    }

    private void appendProcessingError(Rule rule, Message message, String str) {
        String str2 = "For rule '" + rule.name() + "': " + str;
        if (message.hasField(GL2_PROCESSING_ERROR)) {
            message.addField(GL2_PROCESSING_ERROR, ((String) message.getFieldAs(String.class, GL2_PROCESSING_ERROR)) + "," + str2);
        } else {
            message.addField(GL2_PROCESSING_ERROR, str2);
        }
    }

    @Subscribe
    public void handleRuleChanges(RulesChangedEvent rulesChangedEvent) {
        rulesChangedEvent.deletedRuleIds().forEach(str -> {
            log.debug("Invalidated rule {}", str);
            this.metricRegistry.removeMatching((str, metric) -> {
                return str.startsWith(MetricRegistry.name(Rule.class, new String[]{str}));
            });
        });
        rulesChangedEvent.updatedRuleIds().forEach(str2 -> {
            log.debug("Refreshing rule {}", str2);
        });
        this.scheduler.schedule(this::reload, 0L, TimeUnit.SECONDS);
    }

    @Subscribe
    public void handlePipelineChanges(PipelinesChangedEvent pipelinesChangedEvent) {
        pipelinesChangedEvent.deletedPipelineIds().forEach(str -> {
            log.debug("Invalidated pipeline {}", str);
            this.metricRegistry.removeMatching((str, metric) -> {
                return str.startsWith(MetricRegistry.name(Pipeline.class, new String[]{str}));
            });
        });
        pipelinesChangedEvent.updatedPipelineIds().forEach(str2 -> {
            log.debug("Refreshing pipeline {}", str2);
        });
        this.scheduler.schedule(this::reload, 0L, TimeUnit.SECONDS);
    }

    @Subscribe
    public void handlePipelineConnectionChanges(PipelineConnectionsChangedEvent pipelineConnectionsChangedEvent) {
        log.debug("Pipeline stream connection changed: {}", pipelineConnectionsChangedEvent);
        this.scheduler.schedule(this::reload, 0L, TimeUnit.SECONDS);
    }
}
