/*
 * Decompiled with CFR 0.152.
 */
package io.activej.multilog;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.exception.parse.TruncatedDataException;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.common.time.Stopwatch;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.ChannelSupplierTransformer;
import io.activej.csp.process.ChannelLZ4Compressor;
import io.activej.csp.process.ChannelLZ4Decompressor;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.StreamSupplierWithResult;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.StreamSupplierTransformer;
import io.activej.datastream.stats.StreamRegistry;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.datastream.stats.StreamStatsSizeCounter;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanEx;
import io.activej.fs.ActiveFs;
import io.activej.multilog.LogFile;
import io.activej.multilog.LogNamingScheme;
import io.activej.multilog.LogPosition;
import io.activej.multilog.LogStreamChunker;
import io.activej.multilog.Multilog;
import io.activej.multilog.PartitionAndFile;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MultilogImpl<T>
implements Multilog<T>,
EventloopJmxBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(MultilogImpl.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes((long)256L);
    private final Eventloop eventloop;
    private final ActiveFs fs;
    private final LogNamingScheme namingScheme;
    private final BinarySerializer<T> serializer;
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private Duration autoFlushInterval = null;
    private final StreamRegistry<String> streamReads = StreamRegistry.create();
    private final StreamRegistry<String> streamWrites = StreamRegistry.create();
    private final StreamStatsDetailed<ByteBuf> streamReadStats = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> streamWriteStats = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());

    private MultilogImpl(Eventloop eventloop, ActiveFs fs, BinarySerializer<T> serializer, LogNamingScheme namingScheme) {
        this.eventloop = eventloop;
        this.fs = fs;
        this.serializer = serializer;
        this.namingScheme = namingScheme;
    }

    public static <T> MultilogImpl<T> create(Eventloop eventloop, ActiveFs fs, BinarySerializer<T> serializer, LogNamingScheme namingScheme) {
        return new MultilogImpl<T>(eventloop, fs, serializer, namingScheme);
    }

    public MultilogImpl<T> withBufferSize(int bufferSize) {
        this.bufferSize = MemSize.of((long)bufferSize);
        return this;
    }

    public MultilogImpl<T> withBufferSize(MemSize bufferSize) {
        this.bufferSize = bufferSize;
        return this;
    }

    public MultilogImpl<T> withAutoFlushInterval(Duration autoFlushInterval) {
        this.autoFlushInterval = autoFlushInterval;
        return this;
    }

    @Override
    public Promise<StreamConsumer<T>> write(@NotNull String logPartition) {
        MultilogImpl.validateLogPartition(logPartition);
        return Promise.of((Object)StreamConsumer.ofSupplier(supplier -> ((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)supplier.transformWith((StreamSupplierTransformer)ChannelSerializer.create(this.serializer).withAutoFlushInterval(this.autoFlushInterval).withInitialBufferSize(this.bufferSize).withSkipSerializationErrors())).transformWith((ChannelSupplierTransformer)ChannelLZ4Compressor.createFastCompressor())).transformWith((ChannelSupplierTransformer)this.streamWrites.register((Object)logPartition))).transformWith(this.streamWriteStats)).bindTo((ChannelInput)new LogStreamChunker((CurrentTimeProvider)this.eventloop, this.fs, this.namingScheme, logPartition))));
    }

    @Override
    public Promise<StreamSupplierWithResult<T, LogPosition>> read(@NotNull String logPartition, @NotNull LogFile startLogFile, long startOffset, @Nullable LogFile endLogFile) {
        MultilogImpl.validateLogPartition(logPartition);
        LogPosition startPosition = LogPosition.create(startLogFile, startOffset);
        return this.fs.list(this.namingScheme.getListGlob(logPartition)).map(files -> files.keySet().stream().map(this.namingScheme::parse).filter(Objects::nonNull).filter(partitionAndFile -> partitionAndFile.getLogPartition().equals(logPartition)).map(PartitionAndFile::getLogFile).collect(Collectors.toList())).map(logFiles -> logFiles.stream().filter(logFile -> MultilogImpl.isFileInRange(logFile, startPosition, endLogFile)).map(logFile -> logFile.equals(startPosition.getLogFile()) ? startPosition : LogPosition.create(logFile, 0L)).sorted().collect(Collectors.toList())).map(logFilesToRead -> this.readLogFiles(logPartition, startPosition, (List<LogPosition>)logFilesToRead));
    }

    private StreamSupplierWithResult<T, LogPosition> readLogFiles(final @NotNull String logPartition, final @NotNull LogPosition startPosition, final @NotNull List<LogPosition> logFiles) {
        final SettablePromise positionPromise = new SettablePromise();
        Iterator logFileStreams = new Iterator<StreamSupplier<T>>(){
            final Stopwatch sw = Stopwatch.createUnstarted();
            final Iterator<LogPosition> it = logFiles.iterator();
            LogPosition currentPosition;
            long inputStreamPosition;

            @Override
            public boolean hasNext() {
                if (this.it.hasNext()) {
                    return true;
                }
                positionPromise.trySet((Object)this.getLogPosition());
                return false;
            }

            LogPosition getLogPosition() {
                if (this.currentPosition == null) {
                    return startPosition;
                }
                return LogPosition.create(this.currentPosition.getLogFile(), this.currentPosition.getPosition() + this.inputStreamPosition);
            }

            @Override
            public StreamSupplier<T> next() {
                this.currentPosition = this.it.next();
                long position = this.currentPosition.getPosition();
                LogFile currentLogFile = this.currentPosition.getLogFile();
                if (logger.isTraceEnabled()) {
                    logger.trace("Read log file `{}` from: {}", (Object)currentLogFile, (Object)position);
                }
                return StreamSupplier.ofPromise((Promise)MultilogImpl.this.fs.download(MultilogImpl.this.namingScheme.path(logPartition, currentLogFile), position, Long.MAX_VALUE).map(fileStream -> {
                    this.inputStreamPosition = 0L;
                    this.sw.reset().start();
                    return ((StreamSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)fileStream.transformWith((ChannelSupplierTransformer)MultilogImpl.this.streamReads.register((Object)(logPartition + ":" + currentLogFile + "@" + position)))).transformWith((ChannelSupplierTransformer)MultilogImpl.this.streamReadStats)).transformWith((ChannelSupplierTransformer)ChannelLZ4Decompressor.create().withInspector(new ChannelLZ4Decompressor.Inspector(){

                        public <Q extends ChannelLZ4Decompressor.Inspector> Q lookup(Class<Q> type) {
                            throw new UnsupportedOperationException();
                        }

                        public void onBlock(ChannelLZ4Decompressor self, ChannelLZ4Decompressor.Header header, ByteBuf inputBuf, ByteBuf outputBuf) {
                            inputStreamPosition += (long)(ChannelLZ4Decompressor.HEADER_LENGTH + header.compressedLen);
                        }
                    }))).transformWith(supplier -> supplier.withEndOfStream(eos -> eos.thenEx(($, e) -> e == null || e instanceof TruncatedDataException ? Promise.complete() : Promise.ofException((Throwable)e))))).transformWith((ChannelSupplierTransformer)ChannelDeserializer.create((BinarySerializer)MultilogImpl.this.serializer))).withEndOfStream(eos -> eos.whenComplete(($, e) -> this.log(e)));
                }));
            }

            private void log(Throwable e) {
                if (e == null && logger.isTraceEnabled()) {
                    logger.trace("Finish log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{MultilogImpl.this.fs, MultilogImpl.this.namingScheme.path(logPartition, this.currentPosition.getLogFile()), this.sw, this.inputStreamPosition, this.inputStreamPosition / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L)});
                } else if (e != null && logger.isErrorEnabled()) {
                    logger.error("Error on log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{MultilogImpl.this.fs, MultilogImpl.this.namingScheme.path(logPartition, this.currentPosition.getLogFile()), this.sw, this.inputStreamPosition, this.inputStreamPosition / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L), e});
                }
            }
        };
        return StreamSupplierWithResult.of((StreamSupplier)StreamSupplier.concat((Iterator)logFileStreams), (Promise)positionPromise);
    }

    private static void validateLogPartition(@NotNull String logPartition) {
        Checks.checkArgument((!logPartition.contains("-") ? 1 : 0) != 0, (Object)"Using dash (-) in log partition name is not allowed");
    }

    private static boolean isFileInRange(@NotNull LogFile logFile, @NotNull LogPosition startPosition, @Nullable LogFile endFile) {
        if (logFile.compareTo(startPosition.getLogFile()) < 0) {
            return false;
        }
        return endFile == null || logFile.compareTo(endFile) <= 0;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }
}

