/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.internal.utils.persistence.mongo;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.Props;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.SourceRef;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamRefs;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.function.Function;
import org.bson.Document;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.internal.models.streaming.StreamedSnapshot;
import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

@AllValuesAreNonnullByDefault
public final class SnapshotStreamingActor
extends AbstractActor {
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter((Actor)this);
    private final Materializer materializer = Materializer.createMaterializer(() -> ((SnapshotStreamingActor)this).getContext());
    private final Function<String, EntityId> pid2EntityId;
    private final Function<EntityId, String> entityId2Pid;
    private final DittoMongoClient mongoClient;
    private final MongoReadJournal readJournal;

    private SnapshotStreamingActor(Function<String, EntityId> pid2EntityId, Function<EntityId, String> entityId2Pid, DittoMongoClient mongoClient, MongoReadJournal readJournal) {
        this.pid2EntityId = pid2EntityId;
        this.entityId2Pid = entityId2Pid;
        this.mongoClient = mongoClient;
        this.readJournal = readJournal;
    }

    private SnapshotStreamingActor(Function<String, EntityId> pid2EntityId, Function<EntityId, String> entityId2Pid) {
        this.pid2EntityId = pid2EntityId;
        this.entityId2Pid = entityId2Pid;
        Config config = this.getContext().getSystem().settings().config();
        DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of((Config)DefaultScopedConfig.dittoScoped((Config)config));
        this.mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
        this.readJournal = MongoReadJournal.newInstance(config, this.mongoClient, this.getContext().getSystem());
    }

    public static Props props(Function<String, EntityId> pid2EntityId, Function<EntityId, String> entityId2Pid) {
        return Props.create(SnapshotStreamingActor.class, (Object[])new Object[]{pid2EntityId, entityId2Pid});
    }

    public static Props propsForTest(Function<String, EntityId> pid2EntityId, Function<EntityId, String> entityId2Pid, DittoMongoClient mongoClient, MongoReadJournal readJournal) {
        return Props.create(SnapshotStreamingActor.class, (Object[])new Object[]{pid2EntityId, entityId2Pid, mongoClient, readJournal});
    }

    public void postStop() throws Exception {
        this.mongoClient.close();
        super.postStop();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(SudoStreamSnapshots.class, this::startStreaming).matchAny(message -> this.log.warning("Unexpected message: <{}>", message)).build();
    }

    private Source<StreamedSnapshot, NotUsed> createSource(SudoStreamSnapshots command) {
        this.log.info("Starting stream for <{}>", (Object)command);
        int batchSize = command.getBurst();
        String start = command.hasNonEmptyLowerBound() ? this.entityId2Pid.apply(command.getLowerBound()) : "";
        Source<Document, NotUsed> snapshotSource = this.readJournal.getNewestSnapshotsAbove(start, batchSize, this.materializer, (String[])command.getSnapshotFields().stream().map(JsonValue::asString).toArray(String[]::new));
        return snapshotSource.map(this::mapSnapshot).log("snapshot-streaming", (LoggingAdapter)this.log);
    }

    private StreamedSnapshot mapSnapshot(Document snapshot) {
        EntityId entityId = this.pid2EntityId.apply(snapshot.getString((Object)MongoReadJournal.S_ID));
        snapshot.remove((Object)MongoReadJournal.S_ID);
        JsonObject snapshotJson = JsonObject.of((String)snapshot.toJson());
        return StreamedSnapshot.of((EntityId)entityId, (JsonObject)snapshotJson);
    }

    private void startStreaming(SudoStreamSnapshots command) {
        Duration timeout = Duration.ofMillis(command.getTimeoutMillis());
        SourceRef sourceRef = (SourceRef)this.createSource(command).initialTimeout(timeout).idleTimeout(timeout).runWith((Graph)StreamRefs.sourceRef(), this.materializer);
        this.getSender().tell((Object)sourceRef, this.getSelf());
    }
}

