package io.activej.crdt.wal;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.service.EventloopService;
import io.activej.async.util.LogUtils;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.crdt.CrdtData;
import io.activej.crdt.util.CrdtDataSerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.file.ChannelFileWriter;
import io.activej.csp.process.frames.ChannelFrameEncoder;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.csp.process.frames.LZ4FrameFormat;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.promise.jmx.PromiseStats;
import java.io.IOException;
import java.lang.Comparable;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/wal/FileWriteAheadLog.class */
public class FileWriteAheadLog<K extends Comparable<K>, S> implements WriteAheadLog<K, S>, EventloopService, EventloopJmxBeanWithStats, WithInitializer<FileWriteAheadLog<K, S>> {
    private static final Logger logger;
    public static final String EXT_FINAL = ".wal";
    public static final String EXT_CURRENT = ".current";
    public static final FrameFormat FRAME_FORMAT;
    private static final Duration SMOOTHING_WINDOW;
    private final Eventloop eventloop;
    private final Executor executor;
    private final Path path;
    private final CrdtDataSerializer<K, S> serializer;
    private final FlushMode flushMode;
    private final WalUploader<K, S> uploader;
    private FileWriteAheadLog<K, S>.WalConsumer consumer;
    private boolean stopping;
    private boolean flushRequired;
    private boolean detailedMonitoring;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AsyncRunnable flush = AsyncRunnables.coalesce(this::doFlush);
    private boolean scanLostFiles = true;
    private final PromiseStats putPromise = PromiseStats.create(SMOOTHING_WINDOW);
    private final PromiseStats flushPromise = PromiseStats.create(SMOOTHING_WINDOW);
    private final EventStats totalPuts = EventStats.create(SMOOTHING_WINDOW);
    private final EventStats totalFlushes = EventStats.create(SMOOTHING_WINDOW);
    private final ValueStats totalFlushedSize = ValueStats.create(SMOOTHING_WINDOW).withUnit("bytes");

    /* loaded from: input_file:io/activej/crdt/wal/FileWriteAheadLog$FlushMode.class */
    public enum FlushMode {
        UPLOAD_TO_STORAGE,
        ROTATE_FILE,
        ROTATE_FILE_AWAIT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/crdt/wal/FileWriteAheadLog$WalConsumer.class */
    public final class WalConsumer {
        private final AbstractStreamSupplier<CrdtData<K, S>> internalSupplier = (AbstractStreamSupplier<CrdtData<K, S>>) new AbstractStreamSupplier<CrdtData<K, S>>() { // from class: io.activej.crdt.wal.FileWriteAheadLog.WalConsumer.1
            protected void onStarted() {
                resume();
            }
        };
        private final Path walFile;
        private SettablePromise<Void> writeCallback;

        public WalConsumer(Path path) {
            this.walFile = path;
            ChannelConsumer ofPromise = ChannelConsumer.ofPromise(ChannelFileWriter.open(FileWriteAheadLog.this.executor, path));
            this.internalSupplier.streamTo(StreamConsumer.ofSupplier(streamSupplier -> {
                return ((ChannelSupplier) ((ChannelSupplier) streamSupplier.transformWith(ChannelSerializer.create(FileWriteAheadLog.this.serializer).withAutoFlushInterval(Duration.ZERO))).transformWith(ChannelFrameEncoder.create(FileWriteAheadLog.FRAME_FORMAT))).streamTo(ChannelConsumer.of(byteBuf -> {
                    if (this.writeCallback == null) {
                        return ofPromise.accept(byteBuf);
                    }
                    SettablePromise<Void> settablePromise = this.writeCallback;
                    this.writeCallback = null;
                    Promise accept = ofPromise.accept(byteBuf);
                    Objects.requireNonNull(settablePromise);
                    return accept.whenComplete((v1, v2) -> {
                        r1.accept(v1, v2);
                    });
                }));
            }));
        }

        public Path getWalFile() {
            return this.walFile;
        }

        public Promise<Void> accept(CrdtData<K, S> crdtData) {
            if (this.writeCallback == null) {
                this.writeCallback = new SettablePromise<>();
            }
            SettablePromise<Void> settablePromise = this.writeCallback;
            this.internalSupplier.send(crdtData);
            return settablePromise;
        }

        public Promise<Void> finish() {
            this.internalSupplier.sendEndOfStream();
            return this.internalSupplier.getAcknowledgement();
        }
    }

    private FileWriteAheadLog(Eventloop eventloop, Executor executor, Path path, CrdtDataSerializer<K, S> crdtDataSerializer, FlushMode flushMode, @Nullable WalUploader<K, S> walUploader) {
        this.eventloop = eventloop;
        this.executor = executor;
        this.path = path;
        this.serializer = crdtDataSerializer;
        this.flushMode = flushMode;
        this.uploader = walUploader;
    }

    public static <K extends Comparable<K>, S> FileWriteAheadLog<K, S> create(Eventloop eventloop, Executor executor, Path path, CrdtDataSerializer<K, S> crdtDataSerializer, WalUploader<K, S> walUploader) {
        return new FileWriteAheadLog<>(eventloop, executor, path, crdtDataSerializer, FlushMode.UPLOAD_TO_STORAGE, walUploader);
    }

    public static <K extends Comparable<K>, S> FileWriteAheadLog<K, S> create(Eventloop eventloop, Executor executor, Path path, CrdtDataSerializer<K, S> crdtDataSerializer, FlushMode flushMode) {
        Checks.checkArgument(flushMode == FlushMode.ROTATE_FILE || flushMode == FlushMode.ROTATE_FILE_AWAIT);
        return new FileWriteAheadLog<>(eventloop, executor, path, crdtDataSerializer, flushMode, null);
    }

    public FlushMode getFlushMode() {
        return this.flushMode;
    }

    @Override // io.activej.crdt.wal.WriteAheadLog
    public Promise<Void> put(K k, S s) {
        logger.trace("Putting value {} at key {}", s, k);
        this.totalPuts.recordEvent();
        this.flushRequired = true;
        return this.consumer.accept(new CrdtData<>(k, s)).whenComplete(this.putPromise.recordStats());
    }

    @Override // io.activej.crdt.wal.WriteAheadLog
    public Promise<Void> flush() {
        logger.trace("Flush called");
        return this.flush.run().whenComplete(this.flushPromise.recordStats());
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return scanLostFiles().then(this::flushFiles).whenResult(() -> {
            this.consumer = createConsumer();
        });
    }

    @NotNull
    public Promise<?> stop() {
        this.stopping = true;
        return this.flushRequired ? flush() : Utils.deleteWalFiles(this.executor, Collections.singleton(((WalConsumer) this.consumer).walFile));
    }

    @Nullable
    private FileWriteAheadLog<K, S>.WalConsumer createConsumer() {
        if (this.stopping) {
            return null;
        }
        return new WalConsumer(this.path.resolve(UUID.randomUUID() + EXT_CURRENT));
    }

    private Promise<Void> doFlush() {
        if (!this.flushRequired) {
            logger.trace("Nothing to flush");
            return Promise.complete();
        }
        this.flushRequired = false;
        this.totalFlushes.recordEvent();
        logger.trace("Begin flushing write ahead log");
        FileWriteAheadLog<K, S>.WalConsumer walConsumer = this.consumer;
        this.consumer = createConsumer();
        if (this.detailedMonitoring) {
            try {
                this.totalFlushedSize.recordValue(Files.size(((WalConsumer) walConsumer).walFile));
            } catch (IOException e) {
                logger.warn("Could not get the size of flushed file {}", ((WalConsumer) walConsumer).walFile);
            }
        }
        return walConsumer.finish().then(() -> {
            return Promise.ofBlocking(this.executor, () -> {
                rename(walConsumer.walFile);
            }).whenException(exc -> {
                this.scanLostFiles = true;
            });
        }).then(this::scanLostFiles).then(this::flushFiles).whenException(exc -> {
            this.flushRequired = true;
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.Level.TRACE, LogUtils.Level.TRACE, "doFlush", new Object[]{this}));
    }

    private Promise<Void> flushFiles() {
        if (this.flushMode == FlushMode.ROTATE_FILE) {
            return Promise.complete();
        }
        if (this.flushMode == FlushMode.ROTATE_FILE_AWAIT) {
            return awaitExternalFlush();
        }
        if ($assertionsDisabled || (this.flushMode == FlushMode.UPLOAD_TO_STORAGE && this.uploader != null)) {
            return this.uploader.uploadToStorage();
        }
        throw new AssertionError();
    }

    private Promise<Void> scanLostFiles() {
        return !this.scanLostFiles ? Promise.complete() : getLostFiles().then(list -> {
            return Promise.ofBlocking(this.executor, () -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    rename((Path) it.next());
                }
            });
        }).whenResult(() -> {
            this.scanLostFiles = false;
        });
    }

    private void rename(Path path) throws IOException {
        if (!$assertionsDisabled && !path.toString().endsWith(EXT_CURRENT)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.consumer != null && path.equals(this.consumer.getWalFile())) {
            throw new AssertionError();
        }
        Path resolveSibling = path.resolveSibling(path.getFileName().toString().replace(EXT_CURRENT, EXT_FINAL));
        try {
            Files.move(path, resolveSibling, StandardCopyOption.ATOMIC_MOVE);
        } catch (AtomicMoveNotSupportedException e) {
            Files.move(path, resolveSibling, new CopyOption[0]);
        }
    }

    private Promise<Void> awaitExternalFlush() {
        return Utils.getWalFiles(this.executor, this.path).thenIfElse((v0) -> {
            return v0.isEmpty();
        }, list -> {
            return Promise.complete();
        }, list2 -> {
            return Promises.delay(Duration.ofSeconds(1L)).then(this::awaitExternalFlush);
        });
    }

    private Promise<List<Path>> getLostFiles() {
        return Promise.ofBlocking(this.executor, () -> {
            Stream<Path> list = Files.list(this.path);
            try {
                List list2 = (List) list.filter(path -> {
                    return Files.isRegularFile(path, new LinkOption[0]) && path.toString().endsWith(EXT_CURRENT) && (this.consumer == null || !path.equals(this.consumer.getWalFile()));
                }).collect(Collectors.toList());
                if (list != null) {
                    list.close();
                }
                return list2;
            } catch (Throwable th) {
                if (list != null) {
                    try {
                        list.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }).whenResult(list -> {
            if (logger.isTraceEnabled()) {
                logger.trace("Found {} lost files {}", Integer.valueOf(list.size()), list.stream().map((v0) -> {
                    return v0.getFileName();
                }).collect(Collectors.toList()));
            }
        });
    }

    @JmxAttribute
    public PromiseStats getPutPromise() {
        return this.putPromise;
    }

    @JmxAttribute
    public PromiseStats getFlushPromise() {
        return this.flushPromise;
    }

    @JmxAttribute
    public EventStats getTotalPuts() {
        return this.totalPuts;
    }

    @JmxAttribute
    public EventStats getTotalFlushes() {
        return this.totalFlushes;
    }

    @JmxAttribute
    public ValueStats getTotalFlushedSize() {
        return this.totalFlushedSize;
    }

    @JmxAttribute
    public boolean isDetailedMonitoring() {
        return this.detailedMonitoring;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedMonitoring = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedMonitoring = false;
    }

    static {
        $assertionsDisabled = !FileWriteAheadLog.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FileWriteAheadLog.class);
        FRAME_FORMAT = LZ4FrameFormat.create();
        SMOOTHING_WINDOW = ApplicationSettings.getDuration(FileWriteAheadLog.class, "smoothingWindow", Duration.ofMinutes(5L));
    }
}
