/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.internal.utils.persistence.mongo.streaming;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.contrib.persistence.mongodb.JournallingFieldNames$;
import akka.contrib.persistence.mongodb.SnapshottingFieldNames$;
import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.stream.Attributes;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.RestartSettings;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BsonField;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.indices.Index;
import org.eclipse.ditto.internal.utils.persistence.mongo.indices.IndexFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.indices.IndexInitializer;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
import org.reactivestreams.Publisher;

@AllValuesAreNonnullByDefault
public class MongoReadJournal {
    public static final String J_ID;
    public static final String S_ID;
    public static final String PRIORITY_TAG_PREFIX = "priority-";
    private static final String AKKA_PERSISTENCE_JOURNAL_AUTO_START = "akka.persistence.journal.auto-start-journals";
    private static final String AKKA_PERSISTENCE_SNAPS_AUTO_START = "akka.persistence.snapshot-store.auto-start-snapshot-stores";
    private static final String JOURNAL_COLLECTION_NAME_KEY = "overrides.journal-collection";
    private static final String SNAPS_COLLECTION_NAME_KEY = "overrides.snaps-collection";
    private static final String J_PROCESSOR_ID;
    private static final String J_TO;
    private static final String J_TAGS;
    private static final String S_PROCESSOR_ID;
    private static final String S_SN;
    private static final String S_SERIALIZED_SNAPSHOT = "s2";
    private static final String LIFECYCLE = "__lifecycle";
    private static final String J_EVENT;
    public static final String J_EVENT_PID;
    public static final String J_EVENT_MANIFEST;
    public static final String J_EVENT_SN;
    private static final Duration MAX_BACK_OFF_DURATION;
    private static final Index TAG_PID_INDEX;
    private final String journalCollection;
    private final String snapsCollection;
    private final DittoMongoClient mongoClient;
    private final IndexInitializer indexInitializer;

    private MongoReadJournal(String journalCollection, String snapsCollection, DittoMongoClient mongoClient, ActorSystem actorSystem) {
        this.journalCollection = journalCollection;
        this.snapsCollection = snapsCollection;
        this.mongoClient = mongoClient;
        Materializer materializer = SystemMaterializer.get((ActorSystem)actorSystem).materializer();
        this.indexInitializer = IndexInitializer.of(mongoClient.getDefaultDatabase(), materializer);
    }

    public static MongoReadJournal newInstance(ActorSystem system) {
        Config config = system.settings().config();
        DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of((Config)DefaultScopedConfig.dittoScoped((Config)config));
        return MongoReadJournal.newInstance(config, MongoClientWrapper.newInstance(mongoDbConfig), system);
    }

    public static MongoReadJournal newInstance(Config config, DittoMongoClient mongoClient, ActorSystem actorSystem) {
        String autoStartJournalKey = MongoReadJournal.extractAutoStartConfigKey(config, AKKA_PERSISTENCE_JOURNAL_AUTO_START);
        String autoStartSnapsKey = MongoReadJournal.extractAutoStartConfigKey(config, AKKA_PERSISTENCE_SNAPS_AUTO_START);
        String journalCollection = MongoReadJournal.getOverrideCollectionName(config.getConfig(autoStartJournalKey), JOURNAL_COLLECTION_NAME_KEY);
        String snapshotCollection = MongoReadJournal.getOverrideCollectionName(config.getConfig(autoStartSnapsKey), SNAPS_COLLECTION_NAME_KEY);
        return new MongoReadJournal(journalCollection, snapshotCollection, mongoClient, actorSystem);
    }

    public CompletionStage<Done> ensureTagPidIndex() {
        return this.indexInitializer.createNonExistingIndices(this.journalCollection, List.of(TAG_PID_INDEX));
    }

    public Source<String, NotUsed> getJournalPids(int batchSize, Duration maxIdleTime, Materializer mat) {
        int maxRestarts = this.computeMaxRestarts(maxIdleTime);
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listPidsInJournal((MongoCollection<Document>)journal, "", "", batchSize, mat, MAX_BACK_OFF_DURATION, maxRestarts)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    public Source<Document, NotUsed> getLatestJournalEntries(int batchSize, Duration maxIdleTime, Materializer mat) {
        int maxRestarts = this.computeMaxRestarts(maxIdleTime);
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listLatestJournalEntries((MongoCollection<Document>)journal, "", "", batchSize, MAX_BACK_OFF_DURATION, mat, maxRestarts, J_EVENT_PID, J_EVENT_SN, J_EVENT_MANIFEST)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    private Source<List<Document>, NotUsed> listLatestJournalEntries(MongoCollection<Document> journal, String lowerBoundPid, String tag, int batchSize, Duration maxBackoff, Materializer mat, int maxRestarts, String ... journalFields) {
        return this.unfoldBatchedSource(lowerBoundPid, mat, document -> document.getString((Object)J_ID), actualStartPid -> this.listLatestJournalEntries(journal, (String)actualStartPid, tag, batchSize, maxBackoff, maxRestarts, journalFields));
    }

    public Source<String, NotUsed> getJournalPidsWithTag(String tag, int batchSize, Duration maxIdleTime, Materializer mat) {
        int maxRestarts = this.computeMaxRestarts(maxIdleTime);
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listPidsInJournal((MongoCollection<Document>)journal, "", tag, batchSize, mat, MAX_BACK_OFF_DURATION, maxRestarts)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    public Source<String, NotUsed> getJournalPidsWithTagOrderedByPriorityTag(String tag, Duration maxIdleTime) {
        int maxRestarts = this.computeMaxRestarts(maxIdleTime);
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listPidsInJournalOrderedByPriorityTag((MongoCollection<Document>)journal, tag, MAX_BACK_OFF_DURATION, maxRestarts));
    }

    public Source<String, NotUsed> getJournalPidsAbove(String lowerBoundPid, int batchSize, Materializer mat) {
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listPidsInJournal((MongoCollection<Document>)journal, lowerBoundPid, "", batchSize, mat, MAX_BACK_OFF_DURATION, 0)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    public Source<String, NotUsed> getJournalPidsAboveWithTag(String lowerBoundPid, String tag, int batchSize, Materializer mat) {
        return this.getJournal().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)journal -> this.listPidsInJournal((MongoCollection<Document>)journal, lowerBoundPid, tag, batchSize, mat, MAX_BACK_OFF_DURATION, 0)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    public Source<Document, NotUsed> getNewestSnapshotsAbove(String lowerBoundPid, int batchSize, Materializer mat, String ... snapshotFields) {
        return this.getSnapshotStore().withAttributes(Attributes.inputBuffer((int)1, (int)1)).flatMapConcat((akka.japi.function.Function & Serializable)snapshotStore -> this.listNewestSnapshots((MongoCollection<Document>)snapshotStore, lowerBoundPid, batchSize, mat, snapshotFields)).mapConcat((akka.japi.function.Function & Serializable)pids -> pids);
    }

    private Source<List<String>, NotUsed> listPidsInJournal(MongoCollection<Document> journal, String lowerBoundPid, String tag, int batchSize, Materializer mat, Duration maxBackOff, int maxRestarts) {
        return this.unfoldBatchedSource(lowerBoundPid, mat, Function.identity(), actualStartPid -> this.listJournalPidsAbove(journal, (String)actualStartPid, tag, batchSize, maxBackOff, maxRestarts));
    }

    private Source<String, NotUsed> listJournalPidsAbove(MongoCollection<Document> journal, String startPid, String tag, int batchSize, Duration maxBackOff, int maxRestarts) {
        return this.listLatestJournalEntries(journal, startPid, tag, batchSize, maxBackOff, maxRestarts, J_EVENT_PID).flatMapConcat((akka.japi.function.Function & Serializable)document -> {
            Object pid = document.get((Object)J_EVENT_PID);
            if (pid instanceof CharSequence) {
                return Source.single((Object)pid.toString());
            }
            return Source.empty();
        });
    }

    private Source<List<Document>, NotUsed> listNewestSnapshots(MongoCollection<Document> snapshotStore, String lowerBoundPid, int batchSize, Materializer mat, String ... snapshotFields) {
        return this.unfoldBatchedSource(lowerBoundPid, mat, rec$ -> ((SnapshotBatch)rec$).getMaxPid(), actualStartPid -> this.listNewestActiveSnapshotsByBatch(snapshotStore, (String)actualStartPid, batchSize, snapshotFields)).mapConcat((akka.japi.function.Function & Serializable)x -> x).map((akka.japi.function.Function & Serializable)rec$ -> ((SnapshotBatch)rec$).getItems());
    }

    private <T> Source<List<T>, NotUsed> unfoldBatchedSource(String lowerBoundPid, Materializer mat, Function<T, String> seedCreator, Function<String, Source<T, ?>> sourceCreator) {
        return Source.unfoldAsync((Object)"", (akka.japi.function.Function & Serializable)startPid -> {
            String actualStart = lowerBoundPid.compareTo((String)startPid) >= 0 ? lowerBoundPid : startPid;
            return ((CompletionStage)((Source)sourceCreator.apply(actualStart)).runWith((Graph)Sink.seq(), mat)).thenApply(list -> {
                if (list.isEmpty()) {
                    return Optional.empty();
                }
                return Optional.of(Pair.create((Object)((String)seedCreator.apply(list.get(list.size() - 1))), (Object)list));
            });
        }).withAttributes(Attributes.inputBuffer((int)1, (int)1));
    }

    private Source<String, NotUsed> listPidsInJournalOrderedByPriorityTag(MongoCollection<Document> journal, String tag, Duration maxBackOff, int maxRestarts) {
        ArrayList<Bson> pipeline = new ArrayList<Bson>(4);
        if (!tag.isEmpty()) {
            pipeline.add(Aggregates.match((Bson)Filters.eq((String)J_TAGS, (Object)tag)));
        }
        pipeline.add(Aggregates.group((Object)("$" + J_PROCESSOR_ID), (BsonField[])new BsonField[]{Accumulators.last((String)J_TAGS, (Object)("$" + J_TAGS))}));
        BsonDocument arrayFilter = BsonDocument.parse((String)("{\n    $filter: {\n        input: \"$" + J_TAGS + "\",\n        as: \"tags\",\n        cond: {\n            $eq: [\n                {\n                    $substrCP: [\"$$tags\", 0, " + PRIORITY_TAG_PREFIX.length() + "]\n                }\n,                \"priority-\"\n            ]\n        }\n    }\n}"));
        pipeline.add(Aggregates.project((Bson)Projections.computed((String)J_TAGS, (Object)arrayFilter)));
        pipeline.add(Aggregates.sort((Bson)Sorts.orderBy((Bson[])new Bson[]{Sorts.descending((String[])new String[]{J_TAGS})})));
        Duration minBackOff = Duration.ofSeconds(1L);
        double randomFactor = 0.1;
        RestartSettings restartSettings = RestartSettings.create((Duration)minBackOff, (Duration)maxBackOff, (double)0.1).withMaxRestarts(maxRestarts, minBackOff);
        return RestartSource.onFailuresWithBackoff((RestartSettings)restartSettings, (Creator & Serializable)() -> Source.fromPublisher((Publisher)journal.aggregate(pipeline).collation(Collation.builder().locale("en_US").numericOrdering(Boolean.valueOf(true)).build())).flatMapConcat((akka.japi.function.Function & Serializable)document -> {
            Object pid = document.get((Object)J_ID);
            if (pid instanceof CharSequence) {
                return Source.single((Object)pid.toString());
            }
            return Source.empty();
        }));
    }

    private Source<Document, NotUsed> listLatestJournalEntries(MongoCollection<Document> journal, String startPid, String tag, int batchSize, Duration maxBackOff, int maxRestarts, String ... fieldNames) {
        ArrayList<Bson> pipeline = new ArrayList<Bson>(6);
        if (!tag.isEmpty()) {
            pipeline.add(Aggregates.match((Bson)Filters.eq((String)J_TAGS, (Object)tag)));
        }
        if (!startPid.isEmpty()) {
            pipeline.add(Aggregates.match((Bson)Filters.gt((String)J_PROCESSOR_ID, (Object)startPid)));
        }
        pipeline.add(Aggregates.sort((Bson)Sorts.orderBy((Bson[])new Bson[]{Sorts.ascending((String[])new String[]{J_PROCESSOR_ID}), Sorts.descending((String[])new String[]{J_TO})})));
        pipeline.add(Aggregates.limit((int)batchSize));
        pipeline.add(Aggregates.group((Object)("$" + J_PROCESSOR_ID), this.toFirstJournalEntryFields(fieldNames)));
        pipeline.add(Aggregates.sort((Bson)Sorts.ascending((String[])new String[]{J_ID})));
        Duration minBackOff = Duration.ofSeconds(1L);
        double randomFactor = 0.1;
        RestartSettings restartSettings = RestartSettings.create((Duration)minBackOff, (Duration)maxBackOff, (double)0.1).withMaxRestarts(maxRestarts, minBackOff);
        return RestartSource.onFailuresWithBackoff((RestartSettings)restartSettings, (Creator & Serializable)() -> Source.fromPublisher((Publisher)journal.aggregate(pipeline)));
    }

    private List<BsonField> toFirstJournalEntryFields(String ... journalFields) {
        return Arrays.stream(journalFields).map(fieldName -> {
            String serializedFieldName = String.format("$%s.%s", J_EVENT, fieldName);
            BsonArray bsonArray = new BsonArray(List.of(new BsonString(serializedFieldName), new BsonInt32(0)));
            return Accumulators.first((String)fieldName, (Object)new BsonDocument().append("$arrayElemAt", (BsonValue)bsonArray));
        }).collect(Collectors.toList());
    }

    private int computeMaxRestarts(Duration maxDuration) {
        if (MAX_BACK_OFF_DURATION.minus(maxDuration).isNegative()) {
            return Math.max(7, 6 + (int)(maxDuration.toMillis() / MAX_BACK_OFF_DURATION.toMillis()));
        }
        int log2MaxDuration = 63 - Long.numberOfLeadingZeros(maxDuration.getSeconds());
        return Math.max(0, log2MaxDuration);
    }

    private Source<SnapshotBatch, NotUsed> listNewestActiveSnapshotsByBatch(MongoCollection<Document> snapshotStore, String startPid, int batchSize, String ... snapshotFields) {
        ArrayList<Bson> pipeline = new ArrayList<Bson>(5);
        if (!startPid.isEmpty()) {
            pipeline.add(Aggregates.match((Bson)Filters.gt((String)S_PROCESSOR_ID, (Object)startPid)));
        }
        pipeline.add(Aggregates.sort((Bson)Sorts.orderBy((Bson[])new Bson[]{Sorts.ascending((String[])new String[]{S_PROCESSOR_ID}), Sorts.descending((String[])new String[]{S_SN})})));
        pipeline.add(Aggregates.limit((int)batchSize));
        pipeline.add(Aggregates.group((Object)("$" + S_PROCESSOR_ID), this.asFirstSnapshotBsonFields(snapshotFields)));
        pipeline.add(Aggregates.sort((Bson)Sorts.ascending((String[])new String[]{S_ID})));
        String maxPid = "m";
        String items = "i";
        pipeline.add(Aggregates.group(null, (BsonField[])new BsonField[]{Accumulators.max((String)"m", (Object)("$" + S_ID)), Accumulators.push((String)"i", (Object)new Document().append("$cond", (Object)new Document().append("if", (Object)new Document().append("$ne", Arrays.asList("$__lifecycle", "DELETED"))).append("then", (Object)"$$CURRENT").append("else", null)))}));
        pipeline.add(Aggregates.project((Bson)new Document().append("m", (Object)1).append("i", (Object)new Document().append("$setDifference", Arrays.asList("$i", Collections.singletonList(null))))));
        return Source.fromPublisher((Publisher)snapshotStore.aggregate(pipeline)).flatMapConcat((akka.japi.function.Function & Serializable)document -> {
            String theMaxPid = document.getString((Object)"m");
            if (theMaxPid == null) {
                return Source.empty();
            }
            return Source.single((Object)new SnapshotBatch(theMaxPid, document.getList((Object)"i", Document.class)));
        });
    }

    private List<BsonField> asFirstSnapshotBsonFields(String ... snapshotFields) {
        return Stream.concat(Stream.of(LIFECYCLE), Arrays.stream(snapshotFields)).map(fieldName -> {
            String serializedFieldName = String.format("$%s.%s", S_SERIALIZED_SNAPSHOT, fieldName);
            return Accumulators.first((String)fieldName, (Object)serializedFieldName);
        }).collect(Collectors.toList());
    }

    private Source<MongoCollection<Document>, NotUsed> getJournal() {
        return Source.single((Object)this.mongoClient.getDefaultDatabase().getCollection(this.journalCollection));
    }

    private Source<MongoCollection<Document>, NotUsed> getSnapshotStore() {
        return Source.single((Object)this.mongoClient.getDefaultDatabase().getCollection(this.snapsCollection));
    }

    private static String extractAutoStartConfigKey(Config config, String key) {
        List autoStartJournals = config.getStringList(key);
        if (autoStartJournals.size() != 1) {
            String message = String.format("Expect %s to be a singleton list, but it is List(%s)", AKKA_PERSISTENCE_JOURNAL_AUTO_START, String.join((CharSequence)", ", autoStartJournals));
            throw new IllegalArgumentException(message);
        }
        return (String)autoStartJournals.get(0);
    }

    private static String getOverrideCollectionName(Config journalOrSnapsConfig, String key) {
        return journalOrSnapsConfig.getString(key);
    }

    static {
        S_ID = J_ID = JournallingFieldNames$.MODULE$.ID();
        J_PROCESSOR_ID = JournallingFieldNames$.MODULE$.PROCESSOR_ID();
        J_TO = JournallingFieldNames$.MODULE$.TO();
        J_TAGS = JournallingFieldNames$.MODULE$.TAGS();
        S_PROCESSOR_ID = SnapshottingFieldNames$.MODULE$.PROCESSOR_ID();
        S_SN = SnapshottingFieldNames$.MODULE$.SEQUENCE_NUMBER();
        J_EVENT = JournallingFieldNames$.MODULE$.EVENTS();
        J_EVENT_PID = JournallingFieldNames$.MODULE$.PROCESSOR_ID();
        J_EVENT_MANIFEST = JournallingFieldNames$.MODULE$.MANIFEST();
        J_EVENT_SN = JournallingFieldNames$.MODULE$.SEQUENCE_NUMBER();
        MAX_BACK_OFF_DURATION = Duration.ofSeconds(128L);
        TAG_PID_INDEX = IndexFactory.newInstance("ditto_tag_pid", List.of(J_TAGS, J_PROCESSOR_ID), false, true);
    }

    private static final class SnapshotBatch {
        private final String maxPid;
        private final List<Document> items;

        private SnapshotBatch(String maxPid, List<Document> items) {
            this.maxPid = maxPid;
            this.items = items;
        }

        private String getMaxPid() {
            return this.maxPid;
        }

        private List<Document> getItems() {
            return this.items;
        }
    }
}

