/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.internal.utils.persistence.mongo.streaming;

import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.japi.function.Function;
import akka.japi.pf.PFBuilder;
import akka.stream.Attributes;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.javadsl.BroadcastHub;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Source;
import com.mongodb.MongoCommandException;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
import javax.annotation.Nullable;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.internal.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MongoTimestampPersistence
implements TimestampPersistence {
    private static final Duration BACKOFF_MIN = Duration.ofSeconds(1L);
    private static final Duration BACKOFF_MAX = Duration.ofMinutes(2L);
    private static final Document SORT_BY_ID_DESC = new Document().append("_id", (Object)-1);
    private static final long MIN_CAPPED_COLLECTION_SIZE_IN_BYTES = 4096L;
    private static final int COLLECTION_ALREADY_EXISTS_ERROR_CODE = 48;
    private static final String FIELD_TIMESTAMP = "ts";
    private static final String FIELD_TAG = "tg";
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoTimestampPersistence.class);
    private final Source<MongoCollection, NotUsed> collectionSource;

    private MongoTimestampPersistence(Source<MongoCollection, NotUsed> collectionSource) {
        this.collectionSource = collectionSource;
    }

    public static MongoTimestampPersistence initializedInstance(String collectionName, DittoMongoClient mongoClient, Materializer materializer) {
        Source<MongoCollection, NotUsed> collectionSource = MongoTimestampPersistence.createOrGetCappedCollection(mongoClient.getDefaultDatabase(), collectionName, 4096L, materializer);
        return new MongoTimestampPersistence(collectionSource);
    }

    public Source<NotUsed, NotUsed> setTimestamp(Instant timestamp) {
        return this.setTaggedTimestamp(timestamp, null).map((Function & Serializable)done -> NotUsed.getInstance());
    }

    public Source<Done, NotUsed> setTaggedTimestamp(Instant timestamp, @Nullable String tag) {
        Document toStore = new Document().append(FIELD_TIMESTAMP, (Object)Date.from(timestamp)).append(FIELD_TAG, (Object)tag);
        return this.getCollection().flatMapConcat((Function & Serializable)collection -> Source.fromPublisher((Publisher)collection.insertOne((Object)toStore))).map((Function & Serializable)success -> {
            LOGGER.debug("Successfully inserted <{}> tagged <{}>.", (Object)timestamp, (Object)tag);
            return Done.done();
        });
    }

    Source<MongoCollection<Document>, NotUsed> getCollection() {
        return this.collectionSource.take(1L).map((Function & Serializable)document -> document);
    }

    public Source<Optional<Instant>, NotUsed> getTimestampAsync() {
        return this.getTaggedTimestamp().map((Function & Serializable)optional -> optional.map(Pair::first));
    }

    public Source<Optional<Pair<Instant, String>>, NotUsed> getTaggedTimestamp() {
        return this.getCollection().flatMapConcat((Function & Serializable)collection -> Source.fromPublisher((Publisher)collection.find().sort((Bson)SORT_BY_ID_DESC).limit(1))).flatMapConcat((Function & Serializable)doc -> {
            Date date = doc.getDate((Object)FIELD_TIMESTAMP);
            Instant timestamp = date.toInstant();
            String tag = doc.getString((Object)FIELD_TAG);
            LOGGER.debug("Returning timestamp <{}> tagged <{}>.", (Object)timestamp, (Object)tag);
            return Source.single(Optional.of(Pair.create((Object)timestamp, (Object)tag)));
        }).orElse((Graph)Source.single(Optional.empty()));
    }

    private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(MongoDatabase database, String collectionName, long cappedCollectionSizeInBytes, Materializer materializer) {
        Source<Done, NotUsed> createCollectionSource = MongoTimestampPersistence.repeatableCreateCappedCollectionSource(database, collectionName, cappedCollectionSizeInBytes);
        Source infiniteCollectionSource = createCollectionSource.map((Function & Serializable)success -> database.getCollection(collectionName)).flatMapConcat(Source::repeat);
        Source restartSource = RestartSource.withBackoff((Duration)BACKOFF_MIN, (Duration)BACKOFF_MAX, (double)1.0, (Creator & Serializable)() -> infiniteCollectionSource);
        return (Source)restartSource.runWith((Graph)BroadcastHub.of(MongoCollection.class, (int)1), materializer);
    }

    private static Source<Done, NotUsed> repeatableCreateCappedCollectionSource(MongoDatabase database, String collectionName, long cappedCollectionSizeInBytes) {
        CreateCollectionOptions collectionOptions = new CreateCollectionOptions().capped(true).sizeInBytes(cappedCollectionSizeInBytes).maxDocuments(1L);
        return Source.lazySource((Creator & Serializable)() -> Source.fromPublisher((Publisher)database.createCollection(collectionName, collectionOptions))).mapMaterializedValue((Function & Serializable)whatever -> NotUsed.getInstance()).map((Function & Serializable)nullValue -> Done.done()).withAttributes(Attributes.inputBuffer((int)1, (int)1)).recoverWithRetries(1, new PFBuilder().match(MongoCommandException.class, MongoTimestampPersistence::isCollectionAlreadyExistsError, error -> Source.single((Object)Done.done())).build());
    }

    private static boolean isCollectionAlreadyExistsError(MongoCommandException error) {
        return error.getErrorCode() == 48;
    }
}

