package net.pincette.mongo.streams;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.function.SideEffect;
import net.pincette.json.JsonUtil;
import net.pincette.rs.Box;
import net.pincette.rs.Mapper;
import net.pincette.rs.Pipe;
import net.pincette.rs.streams.Message;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.State;

/* loaded from: input_file:net/pincette/mongo/streams/Pipeline.class */
public class Pipeline {
    private static final Logger logger = Logger.getLogger("net.pincette.mongo.streams");
    static final String ADD_FIELDS = "$addFields";
    static final String BACK_TRACE = "$backTrace";
    static final String BUCKET = "$bucket";
    static final String COUNT = "$count";
    static final String DEDUPLICATE = "$deduplicate";
    static final String DELAY = "$delay";
    static final String DELETE = "$delete";
    static final String GROUP = "$group";
    static final String HTTP = "$http";
    static final String JSLT = "$jslt";
    static final String LOOKUP = "$lookup";
    static final String MATCH = "$match";
    static final String MERGE = "$merge";
    static final String OUT = "$out";
    static final String PER = "$per";
    static final String PROBE = "$probe";
    static final String PROJECT = "$project";
    static final String REDACT = "$redact";
    static final String REPLACE_ROOT = "$replaceRoot";
    static final String REPLACE_WITH = "$replaceWith";
    static final String SEND = "$send";
    static final String SET = "$set";
    static final String SET_KEY = "$setKey";
    static final String THROTTLE = "$throttle";
    static final String TRACE = "$trace";
    static final String UNSET = "$unset";
    static final String UNWIND = "$unwind";
    private static final Map<String, Stage> stages = Collections.map(new Pair[]{Pair.pair(ADD_FIELDS, AddFields::stage), Pair.pair(BACK_TRACE, (jsonValue, context) -> {
        return BackTrace.stage(jsonValue);
    }), Pair.pair(BUCKET, Bucket::stage), Pair.pair(COUNT, Count::stage), Pair.pair(DEDUPLICATE, Deduplicate::stage), Pair.pair(DELAY, Delay::stage), Pair.pair(DELETE, Delete::stage), Pair.pair(GROUP, Group::stage), Pair.pair(HTTP, Http::stage), Pair.pair(JSLT, Jslt::stage), Pair.pair(LOOKUP, Lookup::stage), Pair.pair(MATCH, Match::stage), Pair.pair(MERGE, Merge::stage), Pair.pair(OUT, Out::stage), Pair.pair(PER, (jsonValue2, context2) -> {
        return Per.stage(jsonValue2);
    }), Pair.pair(PROBE, Probe::stage), Pair.pair(PROJECT, Project::stage), Pair.pair(REDACT, Redact::stage), Pair.pair(REPLACE_ROOT, ReplaceRoot::stage), Pair.pair(REPLACE_WITH, ReplaceWith::stage), Pair.pair(SEND, Send::stage), Pair.pair(SET, AddFields::stage), Pair.pair(SET_KEY, SetKey::stage), Pair.pair(THROTTLE, (jsonValue3, context3) -> {
        return Throttle.stage(jsonValue3);
    }), Pair.pair(TRACE, Trace::stage), Pair.pair(UNSET, Unset::stage), Pair.pair(UNWIND, (jsonValue4, context4) -> {
        return Unwind.stage(jsonValue4);
    })});

    private Pipeline() {
    }

    public static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> create(JsonArray jsonArray, Context context) {
        Map<String, Stage> merge = context.stageExtensions != null ? Collections.merge(new Map[]{context.stageExtensions, stages}) : stages;
        return (Flow.Processor) jsonArray.stream().filter(JsonUtil::isObject).map((v0) -> {
            return v0.asJsonObject();
        }).map(jsonObject -> {
            return name(jsonObject).flatMap(str -> {
                return Optional.ofNullable((Stage) merge.get(str)).map(stage -> {
                    return (context.trace ? wrapProfile(stage, str) : stage).apply(jsonObject.getValue("/" + str), context);
                });
            });
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).reduce(null, (processor, processor2) -> {
            return processor == null ? processor2 : Box.box(processor, processor2);
        }, (processor3, processor4) -> {
            return processor3;
        });
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> markEnd(State<Long> state, String str, JsonValue jsonValue) {
        return Mapper.map(message -> {
            return (Message) SideEffect.run(() -> {
                logger.log(Level.INFO, "{0} with expression {1} took {2}ms", new Object[]{str, JsonUtil.string(jsonValue), Long.valueOf(Instant.now().toEpochMilli() - ((Long) state.get()).longValue())});
            }).andThenGet(() -> {
                return message;
            });
        });
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> markStart(State<Long> state) {
        return Mapper.map(message -> {
            return (Message) SideEffect.run(() -> {
                state.set(Long.valueOf(Instant.now().toEpochMilli()));
            }).andThenGet(() -> {
                return message;
            });
        });
    }

    private static Optional<String> name(JsonObject jsonObject) {
        return Optional.of(jsonObject.keySet()).filter(set -> {
            return set.size() == 1;
        }).map(set2 -> {
            return (String) set2.iterator().next();
        });
    }

    private static Stage wrapProfile(Stage stage, String str) {
        State state = new State();
        return (jsonValue, context) -> {
            return Pipe.pipe(markStart(state)).then(stage.apply(jsonValue, context)).then(markEnd(state, str, jsonValue));
        };
    }
}
