/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.internal.utils.persistence.mongo;

import akka.NotUsed;
import akka.event.LoggingAdapter;
import akka.japi.function.Function;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.internal.utils.akka.streaming.AbstractStreamingActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

@AllValuesAreNonnullByDefault
public abstract class AbstractPersistenceStreamingActor<T extends EntityIdWithRevision<?>>
extends AbstractStreamingActor<SudoStreamPids, T> {
    private final java.util.function.Function<PidWithSeqNr, T> entityMapper;
    private final java.util.function.Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper;
    private final DittoMongoClient mongoClient;
    private final MongoReadJournal readJournal;

    protected AbstractPersistenceStreamingActor(java.util.function.Function<PidWithSeqNr, T> entityMapper, java.util.function.Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper) {
        this.entityMapper = Objects.requireNonNull(entityMapper);
        this.entityUnmapper = entityUnmapper;
        Config config = this.getContext().getSystem().settings().config();
        DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of((Config)DefaultScopedConfig.dittoScoped((Config)config));
        this.mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
        this.readJournal = MongoReadJournal.newInstance(config, this.mongoClient, this.getContext().getSystem());
    }

    protected AbstractPersistenceStreamingActor(java.util.function.Function<PidWithSeqNr, T> entityMapper, java.util.function.Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper, MongoReadJournal readJournal) {
        this.entityMapper = Objects.requireNonNull(entityMapper);
        this.entityUnmapper = entityUnmapper;
        Config config = this.getContext().getSystem().settings().config();
        DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of((Config)DefaultScopedConfig.dittoScoped((Config)config));
        this.mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
        this.readJournal = readJournal;
    }

    public void postStop() throws Exception {
        this.mongoClient.close();
        super.postStop();
    }

    protected abstract Class<T> getElementClass();

    protected final Class<SudoStreamPids> getCommandClass() {
        return SudoStreamPids.class;
    }

    protected int getBurst(SudoStreamPids command) {
        return command.getBurst();
    }

    protected Duration getInitialTimeout(SudoStreamPids command) {
        return Duration.ofMillis(command.getTimeoutMillis());
    }

    protected Duration getIdleTimeout(SudoStreamPids command) {
        return Duration.ofMillis(command.getTimeoutMillis());
    }

    protected Object batchMessages(List<T> elements) {
        return BatchedEntityIdWithRevisions.of(this.getElementClass(), elements);
    }

    protected final Source<T, NotUsed> createSource(SudoStreamPids command) {
        Source<String, NotUsed> pidSource;
        this.log.info("Starting stream for <{}>", (Object)command);
        Duration maxIdleTime = Duration.ofMillis(command.getTimeoutMillis());
        int batchSize = command.getBurst() * 5;
        if (command.hasNonEmptyLowerBound()) {
            PidWithSeqNr pidWithSeqNr = this.entityUnmapper.apply(command.getLowerBound());
            pidSource = this.readJournal.getJournalPidsAbove(pidWithSeqNr.getPersistenceId(), batchSize, this.materializer);
        } else {
            pidSource = this.readJournal.getJournalPids(batchSize, maxIdleTime, this.materializer);
        }
        return pidSource.map((Function & Serializable)pid -> this.mapEntity(new PidWithSeqNr((String)pid, 0L))).log("pid-streaming", (LoggingAdapter)this.log);
    }

    private T mapEntity(PidWithSeqNr pidWithSeqNr) {
        return (T)((EntityIdWithRevision)this.entityMapper.apply(pidWithSeqNr));
    }
}

