package org.eclipse.ditto.services.utils.persistence.mongo.streaming;

import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.javadsl.BroadcastHub;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Source;
import com.mongodb.MongoCommandException;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
import javax.annotation.Nullable;
import org.bson.Document;
import org.eclipse.ditto.services.utils.akka.streaming.TimestampPersistence;
import org.eclipse.ditto.services.utils.persistence.mongo.DittoMongoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence.class */
public final class MongoTimestampPersistence implements TimestampPersistence {
    private static final long MIN_CAPPED_COLLECTION_SIZE_IN_BYTES = 4096;
    private static final int COLLECTION_ALREADY_EXISTS_ERROR_CODE = 48;
    private static final String FIELD_TIMESTAMP = "ts";
    private static final String FIELD_TAG = "tg";
    private final Source<MongoCollection, NotUsed> collectionSource;
    private static final Duration BACKOFF_MIN = Duration.ofSeconds(1);
    private static final Duration BACKOFF_MAX = Duration.ofMinutes(2);
    private static final Document SORT_BY_ID_DESC = new Document().append("_id", -1);
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoTimestampPersistence.class);

    private MongoTimestampPersistence(Source<MongoCollection, NotUsed> source) {
        this.collectionSource = source;
    }

    public static MongoTimestampPersistence initializedInstance(String str, DittoMongoClient dittoMongoClient, Materializer materializer) {
        return new MongoTimestampPersistence(createOrGetCappedCollection(dittoMongoClient.getDefaultDatabase(), str, MIN_CAPPED_COLLECTION_SIZE_IN_BYTES, materializer));
    }

    public Source<NotUsed, NotUsed> setTimestamp(Instant instant) {
        return setTaggedTimestamp(instant, null).map(done -> {
            return NotUsed.getInstance();
        });
    }

    public Source<Done, NotUsed> setTaggedTimestamp(Instant instant, @Nullable String str) {
        Document append = new Document().append(FIELD_TIMESTAMP, Date.from(instant)).append(FIELD_TAG, str);
        return getCollection().flatMapConcat(mongoCollection -> {
            return Source.fromPublisher(mongoCollection.insertOne(append));
        }).map(insertOneResult -> {
            LOGGER.debug("Successfully inserted <{}> tagged <{}>.", instant, str);
            return Done.done();
        });
    }

    Source<MongoCollection<Document>, NotUsed> getCollection() {
        return this.collectionSource.take(1L).map(mongoCollection -> {
            return mongoCollection;
        });
    }

    public Source<Optional<Instant>, NotUsed> getTimestampAsync() {
        return getTaggedTimestamp().map(optional -> {
            return optional.map((v0) -> {
                return v0.first();
            });
        });
    }

    public Source<Optional<Pair<Instant, String>>, NotUsed> getTaggedTimestamp() {
        return getCollection().flatMapConcat(mongoCollection -> {
            return Source.fromPublisher(mongoCollection.find().sort(SORT_BY_ID_DESC).limit(1));
        }).flatMapConcat(document -> {
            Instant instant = document.getDate(FIELD_TIMESTAMP).toInstant();
            String string = document.getString(FIELD_TAG);
            LOGGER.debug("Returning timestamp <{}> tagged <{}>.", instant, string);
            return Source.single(Optional.of(Pair.create(instant, string)));
        }).orElse(Source.single(Optional.empty()));
    }

    private static Source<MongoCollection, NotUsed> createOrGetCappedCollection(MongoDatabase mongoDatabase, String str, long j, Materializer materializer) {
        Source flatMapConcat = repeatableCreateCappedCollectionSource(mongoDatabase, str, j).map(done -> {
            return mongoDatabase.getCollection(str);
        }).flatMapConcat((v0) -> {
            return Source.repeat(v0);
        });
        return (Source) RestartSource.withBackoff(BACKOFF_MIN, BACKOFF_MAX, 1.0d, () -> {
            return flatMapConcat;
        }).runWith(BroadcastHub.of(MongoCollection.class, 1), materializer);
    }

    private static Source<Done, NotUsed> repeatableCreateCappedCollectionSource(MongoDatabase mongoDatabase, String str, long j) {
        CreateCollectionOptions maxDocuments = new CreateCollectionOptions().capped(true).sizeInBytes(j).maxDocuments(1L);
        return Source.lazySource(() -> {
            return Source.fromPublisher(mongoDatabase.createCollection(str, maxDocuments));
        }).mapMaterializedValue(completionStage -> {
            return NotUsed.getInstance();
        }).map(r2 -> {
            return Done.done();
        }).withAttributes(Attributes.inputBuffer(1, 1)).recoverWithRetries(1, new PFBuilder().match(MongoCommandException.class, MongoTimestampPersistence::isCollectionAlreadyExistsError, mongoCommandException -> {
            return Source.single(Done.done());
        }).build());
    }

    private static boolean isCollectionAlreadyExistsError(MongoCommandException mongoCommandException) {
        return mongoCommandException.getErrorCode() == COLLECTION_ALREADY_EXISTS_ERROR_CODE;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -934531685:
                if (implMethodName.equals("repeat")) {
                    z = 12;
                    break;
                }
                break;
            case -285732164:
                if (implMethodName.equals("lambda$createOrGetCappedCollection$9a5162ee$1")) {
                    z = true;
                    break;
                }
                break;
            case 11510777:
                if (implMethodName.equals("lambda$createOrGetCappedCollection$5e8c9517$1")) {
                    z = 11;
                    break;
                }
                break;
            case 130976026:
                if (implMethodName.equals("lambda$getTaggedTimestamp$bf05cc0a$1")) {
                    z = 8;
                    break;
                }
                break;
            case 130976027:
                if (implMethodName.equals("lambda$getTaggedTimestamp$bf05cc0a$2")) {
                    z = 10;
                    break;
                }
                break;
            case 266122581:
                if (implMethodName.equals("lambda$getCollection$e580f900$1")) {
                    z = 3;
                    break;
                }
                break;
            case 444191816:
                if (implMethodName.equals("lambda$getTimestampAsync$6b3ee16b$1")) {
                    z = 5;
                    break;
                }
                break;
            case 481828310:
                if (implMethodName.equals("lambda$repeatableCreateCappedCollectionSource$8777b9b5$1")) {
                    z = 4;
                    break;
                }
                break;
            case 938557176:
                if (implMethodName.equals("lambda$setTimestamp$fd6374cc$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1130598774:
                if (implMethodName.equals("lambda$setTaggedTimestamp$911595d6$1")) {
                    z = 9;
                    break;
                }
                break;
            case 1602784552:
                if (implMethodName.equals("lambda$setTaggedTimestamp$b999c4a5$1")) {
                    z = false;
                    break;
                }
                break;
            case 1964974092:
                if (implMethodName.equals("lambda$repeatableCreateCappedCollectionSource$c090ab71$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1964974093:
                if (implMethodName.equals("lambda$repeatableCreateCappedCollectionSource$c090ab71$2")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Ljava/time/Instant;Ljava/lang/String;Lcom/mongodb/client/result/InsertOneResult;)Lakka/Done;")) {
                    Instant instant = (Instant) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    return insertOneResult -> {
                        LOGGER.debug("Successfully inserted <{}> tagged <{}>.", instant, str);
                        return Done.done();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/reactivestreams/client/MongoDatabase;Ljava/lang/String;Lakka/Done;)Lcom/mongodb/reactivestreams/client/MongoCollection;")) {
                    MongoDatabase mongoDatabase = (MongoDatabase) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    return done -> {
                        return mongoDatabase.getCollection(str2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lakka/Done;)Lakka/NotUsed;")) {
                    return done2 -> {
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/reactivestreams/client/MongoCollection;)Lcom/mongodb/reactivestreams/client/MongoCollection;")) {
                    return mongoCollection -> {
                        return mongoCollection;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/reactivestreams/client/MongoDatabase;Ljava/lang/String;Lcom/mongodb/client/model/CreateCollectionOptions;)Lakka/stream/javadsl/Source;")) {
                    MongoDatabase mongoDatabase2 = (MongoDatabase) serializedLambda.getCapturedArg(0);
                    String str3 = (String) serializedLambda.getCapturedArg(1);
                    CreateCollectionOptions createCollectionOptions = (CreateCollectionOptions) serializedLambda.getCapturedArg(2);
                    return () -> {
                        return Source.fromPublisher(mongoDatabase2.createCollection(str3, createCollectionOptions));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Optional;)Ljava/util/Optional;")) {
                    return optional -> {
                        return optional.map((v0) -> {
                            return v0.first();
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lakka/Done;")) {
                    return r2 -> {
                        return Done.done();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CompletionStage;)Lakka/NotUsed;")) {
                    return completionStage -> {
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/reactivestreams/client/MongoCollection;)Lakka/stream/Graph;")) {
                    return mongoCollection2 -> {
                        return Source.fromPublisher(mongoCollection2.find().sort(SORT_BY_ID_DESC).limit(1));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lorg/bson/Document;Lcom/mongodb/reactivestreams/client/MongoCollection;)Lakka/stream/Graph;")) {
                    Document document = (Document) serializedLambda.getCapturedArg(0);
                    return mongoCollection3 -> {
                        return Source.fromPublisher(mongoCollection3.insertOne(document));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lorg/bson/Document;)Lakka/stream/Graph;")) {
                    return document2 -> {
                        Instant instant2 = document2.getDate(FIELD_TIMESTAMP).toInstant();
                        String string = document2.getString(FIELD_TAG);
                        LOGGER.debug("Returning timestamp <{}> tagged <{}>.", instant2, string);
                        return Source.single(Optional.of(Pair.create(instant2, string)));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoTimestampPersistence") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/javadsl/Source;")) {
                    Source source = (Source) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return source;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/stream/javadsl/Source") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Lakka/stream/javadsl/Source;")) {
                    return (v0) -> {
                        return Source.repeat(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
