package io.activej.crdt.wal;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.common.ApplicationSettings;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.function.FunctionEx;
import io.activej.common.initializer.WithInitializer;
import io.activej.crdt.CrdtData;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.CrdtStorage;
import io.activej.crdt.util.CrdtDataSerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.file.ChannelFileReader;
import io.activej.csp.process.frames.ChannelFrameDecoder;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import io.activej.datastream.processor.StreamSorter;
import io.activej.datastream.processor.StreamSorterStorageImpl;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.io.IOException;
import java.lang.Comparable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/wal/WalUploader.class */
public final class WalUploader<K extends Comparable<K>, S> implements EventloopJmxBeanWithStats, WithInitializer<WalUploader<K, S>> {
    private static final Logger logger = LoggerFactory.getLogger(WalUploader.class);
    private static final int DEFAULT_SORT_ITEMS_IN_MEMORY = ApplicationSettings.getInt(WalUploader.class, "sortItemsInMemory", 100000);
    private static final Duration SMOOTHING_WINDOW = ApplicationSettings.getDuration(WalUploader.class, "smoothingWindow", Duration.ofMinutes(5));
    private final Eventloop eventloop;
    private final Executor executor;
    private final Path path;
    private final CrdtFunction<S> function;
    private final CrdtDataSerializer<K, S> serializer;
    private final CrdtStorage<K, S> storage;
    private boolean detailedMonitoring;

    @Nullable
    private Path sortDir;
    private final AsyncRunnable uploadToStorage = AsyncRunnables.coalesce(this::doUploadToStorage);
    private final PromiseStats uploadPromise = PromiseStats.create(SMOOTHING_WINDOW);
    private final ValueStats totalFilesUploaded = ValueStats.create(SMOOTHING_WINDOW);
    private final ValueStats totalFilesUploadedSize = ValueStats.create(SMOOTHING_WINDOW).withUnit("bytes");
    private int sortItemsInMemory = DEFAULT_SORT_ITEMS_IN_MEMORY;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/crdt/wal/WalUploader$WalReducer.class */
    public final class WalReducer implements StreamReducers.Reducer<K, CrdtData<K, S>, CrdtData<K, S>, CrdtData<K, S>> {
        private WalReducer() {
        }

        public CrdtData<K, S> onFirstItem(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtData<K, S> crdtData) {
            return crdtData;
        }

        public CrdtData<K, S> onNextItem(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtData<K, S> crdtData, CrdtData<K, S> crdtData2) {
            return new CrdtData<>(k, WalUploader.this.function.merge(crdtData2.getState(), crdtData.getState()));
        }

        public void onComplete(StreamDataAcceptor<CrdtData<K, S>> streamDataAcceptor, K k, CrdtData<K, S> crdtData) {
            streamDataAcceptor.accept(crdtData);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void onComplete(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            onComplete((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtData<StreamDataAcceptor, S>) obj2);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onNextItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2, Object obj3) {
            return onNextItem((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtData<StreamDataAcceptor, S>) obj2, (CrdtData<StreamDataAcceptor, S>) obj3);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onFirstItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            return onFirstItem((StreamDataAcceptor<CrdtData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtData<StreamDataAcceptor, S>) obj2);
        }
    }

    private WalUploader(Eventloop eventloop, Executor executor, Path path, CrdtFunction<S> crdtFunction, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtStorage<K, S> crdtStorage) {
        this.eventloop = eventloop;
        this.executor = executor;
        this.path = path;
        this.function = crdtFunction;
        this.serializer = crdtDataSerializer;
        this.storage = crdtStorage;
    }

    public static <K extends Comparable<K>, S> WalUploader<K, S> create(Eventloop eventloop, Executor executor, Path path, CrdtFunction<S> crdtFunction, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtStorage<K, S> crdtStorage) {
        return new WalUploader<>(eventloop, executor, path, crdtFunction, crdtDataSerializer, crdtStorage);
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> WalUploader<K, S> create(Eventloop eventloop, Executor executor, Path path, CrdtDataSerializer<K, S> crdtDataSerializer, CrdtStorage<K, S> crdtStorage) {
        return new WalUploader<>(eventloop, executor, path, CrdtFunction.ofCrdtType(), crdtDataSerializer, crdtStorage);
    }

    public WalUploader<K, S> withSortDir(Path path) {
        this.sortDir = path;
        return this;
    }

    public WalUploader<K, S> withSortItemsInMemory(int i) {
        this.sortItemsInMemory = i;
        return this;
    }

    public Promise<Void> uploadToStorage() {
        return this.uploadToStorage.run().whenComplete(this.uploadPromise.recordStats());
    }

    private Promise<Void> doUploadToStorage() {
        return Utils.getWalFiles(this.executor, this.path).then(this::uploadWaLFiles);
    }

    private Promise<Void> uploadWaLFiles(List<Path> list) throws IOException {
        if (list.isEmpty()) {
            logger.info("Nothing to upload");
            return Promise.complete();
        }
        if (logger.isInfoEnabled()) {
            logger.info("Uploading write ahead logs {}", list.stream().map((v0) -> {
                return v0.getFileName();
            }).collect(Collectors.toList()));
        }
        try {
            Path createSortDir = createSortDir();
            return ((StreamSupplier) createReducer(list).getOutput().transformWith(createSorter(createSortDir))).streamTo(this.storage.upload()).whenComplete(() -> {
                cleanup(createSortDir);
            }).whenResult(() -> {
                this.totalFilesUploaded.recordValue(list.size());
                if (this.detailedMonitoring) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        Path path = (Path) it.next();
                        try {
                            this.totalFilesUploadedSize.recordValue(Files.size(path));
                        } catch (IOException e) {
                            logger.warn("Could not get the size of uploaded file {}", path);
                        }
                    }
                }
            }).then(() -> {
                return Utils.deleteWalFiles(this.executor, list);
            });
        } catch (IOException e) {
            logger.warn("Failed to create temporary sort directory", e);
            throw e;
        }
    }

    private Path createSortDir() throws IOException {
        return this.sortDir != null ? this.sortDir : Files.createTempDirectory("crdt-wal-sort-dir", new FileAttribute[0]);
    }

    private StreamSorter<K, CrdtData<K, S>> createSorter(Path path) {
        return StreamSorter.create(StreamSorterStorageImpl.create(this.executor, this.serializer, FileWriteAheadLog.FRAME_FORMAT, path), (v0) -> {
            return v0.getKey();
        }, Comparator.naturalOrder(), false, this.sortItemsInMemory);
    }

    private void cleanup(Path path) {
        if (this.sortDir == null) {
            try {
                Files.deleteIfExists(path);
            } catch (IOException e) {
            }
        }
    }

    private StreamReducer<K, CrdtData<K, S>, CrdtData<K, S>> createReducer(List<Path> list) {
        StreamReducer<K, CrdtData<K, S>, CrdtData<K, S>> create = StreamReducer.create();
        for (Path path : list) {
            ((StreamSupplier) ((ChannelSupplier) ChannelSupplier.ofPromise(ChannelFileReader.open(this.executor, path)).transformWith(ChannelFrameDecoder.create(FileWriteAheadLog.FRAME_FORMAT))).withEndOfStream(promise -> {
                return promise.map(FunctionEx.identity(), exc -> {
                    if (!(exc instanceof TruncatedDataException)) {
                        throw exc;
                    }
                    logger.warn("Write ahead log {} was truncated", path);
                    return null;
                });
            }).transformWith(ChannelDeserializer.create(this.serializer))).streamTo(create.newInput((v0) -> {
                return v0.getKey();
            }, new WalReducer()));
        }
        return create;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public PromiseStats getUploadPromise() {
        return this.uploadPromise;
    }

    @JmxAttribute
    public ValueStats getTotalFilesUploaded() {
        return this.totalFilesUploaded;
    }

    @JmxAttribute
    public ValueStats getTotalFilesUploadedSize() {
        return this.totalFilesUploadedSize;
    }

    @JmxAttribute
    public boolean isDetailedMonitoring() {
        return this.detailedMonitoring;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedMonitoring = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedMonitoring = false;
    }
}
