package net.pincette.mongo.streams;

import com.mongodb.reactivestreams.client.MongoDatabase;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.function.SideEffect;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.Features;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:net/pincette/mongo/streams/Pipeline.class */
public class Pipeline {
    private static final Logger logger = Logger.getLogger("net.pincette.mongo.streams");
    private static final String ADD_FIELDS = "$addFields";
    private static final String BUCKET = "$bucket";
    private static final String COUNT = "$count";
    private static final String DELETE = "$delete";
    private static final String GROUP = "$group";
    private static final String JSLT = "$jslt";
    private static final String LOOKUP = "$lookup";
    private static final String MATCH = "$match";
    private static final String MERGE = "$merge";
    private static final String OUT = "$out";
    private static final String PROBE = "$probe";
    private static final String PROJECT = "$project";
    private static final String REDACT = "$redact";
    private static final String REPLACE_ROOT = "$replaceRoot";
    private static final String REPLACE_WITH = "$replaceWith";
    private static final String SET = "$set";
    private static final String SET_KEY = "$setKey";
    private static final String TRACE = "$trace";
    private static final String UNSET = "$unset";
    private static final String UNWIND = "$unwind";
    private static final Map<String, Stage> stages = Collections.map(new Pair[]{Pair.pair(ADD_FIELDS, AddFields::stage), Pair.pair(BUCKET, Bucket::stage), Pair.pair(COUNT, Count::stage), Pair.pair(DELETE, Delete::stage), Pair.pair(GROUP, Group::stage), Pair.pair(JSLT, Jslt::stage), Pair.pair(LOOKUP, Lookup::stage), Pair.pair(MATCH, Match::stage), Pair.pair(MERGE, Merge::stage), Pair.pair(OUT, Out::stage), Pair.pair(PROBE, (kStream, jsonValue, context) -> {
        return Probe.stage(kStream, jsonValue);
    }), Pair.pair(PROJECT, Project::stage), Pair.pair(REDACT, Redact::stage), Pair.pair(REPLACE_ROOT, ReplaceRoot::stage), Pair.pair(REPLACE_WITH, ReplaceRoot::stage), Pair.pair(SET, AddFields::stage), Pair.pair(SET_KEY, SetKey::stage), Pair.pair(TRACE, Trace::stage), Pair.pair(UNSET, Unset::stage), Pair.pair(UNWIND, (kStream2, jsonValue2, context2) -> {
        return Unwind.stage(kStream2, jsonValue2);
    })});

    private Pipeline() {
    }

    public static KStream<String, JsonObject> create(String str, KStream<String, JsonObject> kStream, JsonArray jsonArray, MongoDatabase mongoDatabase) {
        return create(str, kStream, jsonArray, mongoDatabase, false);
    }

    public static KStream<String, JsonObject> create(String str, KStream<String, JsonObject> kStream, JsonArray jsonArray, MongoDatabase mongoDatabase, boolean z) {
        return create(str, kStream, jsonArray, mongoDatabase, z, null);
    }

    public static KStream<String, JsonObject> create(String str, KStream<String, JsonObject> kStream, JsonArray jsonArray, MongoDatabase mongoDatabase, boolean z, Features features) {
        Context context = new Context(str, mongoDatabase, z, features);
        return (KStream) jsonArray.stream().filter(JsonUtil::isObject).map((v0) -> {
            return v0.asJsonObject();
        }).map(jsonObject -> {
            return name(jsonObject).flatMap(str2 -> {
                return Optional.ofNullable(stages.get(str2)).map(stage -> {
                    return Pair.pair(z ? wrapProfile(stage, str2) : stage, jsonObject.getValue("/" + str2));
                });
            });
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).reduce(kStream, (kStream2, pair) -> {
            return ((Stage) pair.first).apply(kStream2, (JsonValue) pair.second, context);
        }, (kStream3, kStream4) -> {
            return kStream3;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KStream<String, JsonObject> markEnd(long[] jArr, KStream<String, JsonObject> kStream, String str, JsonValue jsonValue) {
        return kStream.mapValues(jsonObject -> {
            return (JsonObject) SideEffect.run(() -> {
                logger.log(Level.INFO, "{0} with expression {1} took {2}ms", new Object[]{str, JsonUtil.string(jsonValue), Long.valueOf(Instant.now().toEpochMilli() - jArr[0])});
            }).andThenGet(() -> {
                return jsonObject;
            });
        });
    }

    private static KStream<String, JsonObject> markStart(long[] jArr, KStream<String, JsonObject> kStream) {
        return kStream.mapValues(jsonObject -> {
            return (JsonObject) SideEffect.run(() -> {
                jArr[0] = Instant.now().toEpochMilli();
            }).andThenGet(() -> {
                return jsonObject;
            });
        });
    }

    private static Optional<String> name(JsonObject jsonObject) {
        return Optional.of(jsonObject.keySet()).filter(set -> {
            return set.size() == 1;
        }).map(set2 -> {
            return (String) set2.iterator().next();
        });
    }

    private static Stage wrapProfile(Stage stage, String str) {
        long[] jArr = new long[1];
        return (kStream, jsonValue, context) -> {
            return markEnd(jArr, stage.apply(markStart(jArr, kStream), jsonValue, context), str, jsonValue);
        };
    }
}
