package org.opendaylight.controller.akka.segjournal;

import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.persistence.AtomicWrite;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;

/* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.class */
public class SegmentedFileJournal extends AsyncWriteJournal {
    public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
    public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
    public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16777216;
    public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
    public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = 134217728;
    public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
    public static final int STORAGE_MAX_UNFLUSHED_BYTES_DEFAULT = 0;
    public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
    private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
    private final Map<String, ActorRef> handlers = new HashMap();
    private final File rootDir;
    private final StorageLevel storage;
    private final int maxEntrySize;
    private final int maxSegmentSize;
    private final int maxUnflushedBytes;

    public SegmentedFileJournal(Config config) {
        this.rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
        if (!this.rootDir.exists()) {
            LOG.debug("Creating directory {}", this.rootDir);
            Preconditions.checkState(this.rootDir.mkdirs(), "Failed to create root directory %s", this.rootDir);
        }
        Preconditions.checkArgument(this.rootDir.isDirectory(), "%s is not a directory", this.rootDir);
        this.maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
        this.maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
        this.maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, 0);
        if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
            this.storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
        } else {
            this.storage = StorageLevel.DISK;
        }
        LOG.info("Initialized with root directory {} with storage {}", this.rootDir, this.storage);
    }

    public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> iterable) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (AtomicWrite atomicWrite : iterable) {
            arrayList.add(((SegmentedJournalActor.WriteMessages) hashMap.computeIfAbsent(this.handlers.computeIfAbsent(atomicWrite.persistenceId(), this::createHandler), actorRef -> {
                return new SegmentedJournalActor.WriteMessages();
            })).add(atomicWrite));
        }
        hashMap.forEach((actorRef2, writeMessages) -> {
            LOG.trace("Sending {} to {}", writeMessages, actorRef2);
            actorRef2.tell(writeMessages, ActorRef.noSender());
        });
        return Futures.sequence(arrayList, context().dispatcher());
    }

    public Future<Void> doAsyncDeleteMessagesTo(String str, long j) {
        return delegateMessage(str, SegmentedJournalActor.deleteMessagesTo(j));
    }

    public Future<Void> doAsyncReplayMessages(String str, long j, long j2, long j3, Consumer<PersistentRepr> consumer) {
        return delegateMessage(str, SegmentedJournalActor.replayMessages(j, j2, j3, consumer));
    }

    public Future<Long> doAsyncReadHighestSequenceNr(String str, long j) {
        return delegateMessage(this.handlers.computeIfAbsent(str, this::createHandler), SegmentedJournalActor.readHighestSequenceNr(j));
    }

    private ActorRef createHandler(String str) {
        File file = new File(this.rootDir, URLEncoder.encode(str, StandardCharsets.UTF_8));
        LOG.debug("Creating handler for {} in directory {}", str, file);
        ActorRef actorOf = context().actorOf(SegmentedJournalActor.props(str, file, this.storage, this.maxEntrySize, this.maxSegmentSize, this.maxUnflushedBytes));
        LOG.debug("Directory {} handled by {}", file, actorOf);
        return actorOf;
    }

    private <T> Future<T> delegateMessage(String str, SegmentedJournalActor.AsyncMessage<T> asyncMessage) {
        ActorRef actorRef = this.handlers.get(str);
        return actorRef == null ? Futures.failed(new IllegalStateException("Cannot find handler for " + str)) : delegateMessage(actorRef, asyncMessage);
    }

    private static <T> Future<T> delegateMessage(ActorRef actorRef, SegmentedJournalActor.AsyncMessage<T> asyncMessage) {
        LOG.trace("Delegating {} to {}", asyncMessage, actorRef);
        actorRef.tell(asyncMessage, ActorRef.noSender());
        return asyncMessage.promise.future();
    }

    private static int getBytes(Config config, String str, int i) {
        if (!config.hasPath(str)) {
            return i;
        }
        long longValue = config.getBytes(str).longValue();
        Preconditions.checkArgument(longValue <= 2147483647L, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
        return (int) longValue;
    }
}
