package io.activej.multilog;

import io.activej.async.function.AsyncFunctionEx;
import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.common.function.FunctionEx;
import io.activej.common.ref.RefBoolean;
import io.activej.common.time.Stopwatch;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.process.frame.ChannelFrameDecoder;
import io.activej.csp.process.frame.ChannelFrameEncoder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.csp.process.transformer.ChannelConsumerTransformer;
import io.activej.csp.process.transformer.ChannelSupplierTransformer;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.transformer.StreamSupplierTransformer;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamRegistry;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsSizeCounter;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSupplierWithResult;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.fs.IFileSystem;
import io.activej.fs.exception.IllegalOffsetException;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/multilog/Multilog.class */
public final class Multilog<T> extends AbstractReactive implements IMultilog<T>, ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(Multilog.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256);
    private final IFileSystem fileSystem;
    private final LogNamingScheme namingScheme;
    private final BinarySerializer<T> serializer;
    private MemSize bufferSize;
    private Duration autoFlushInterval;
    private boolean ignoreMalformedLogs;
    private final FrameFormat frameFormat;
    private final StreamRegistry<String> streamReads;
    private final StreamRegistry<String> streamWrites;
    private final DetailedStreamStats<ByteBuf> streamReadStats;
    private final DetailedStreamStats<ByteBuf> streamWriteStats;

    /* loaded from: input_file:io/activej/multilog/Multilog$Builder.class */
    public final class Builder extends AbstractBuilder<Multilog<T>.Builder, Multilog<T>> {
        private Builder() {
        }

        public Multilog<T>.Builder withBufferSize(int i) {
            checkNotBuilt(this);
            Multilog.this.bufferSize = MemSize.of(i);
            return this;
        }

        public Multilog<T>.Builder withBufferSize(MemSize memSize) {
            checkNotBuilt(this);
            Multilog.this.bufferSize = memSize;
            return this;
        }

        public Multilog<T>.Builder withAutoFlushInterval(Duration duration) {
            checkNotBuilt(this);
            Multilog.this.autoFlushInterval = duration;
            return this;
        }

        public Multilog<T>.Builder withIgnoreMalformedLogs(boolean z) {
            checkNotBuilt(this);
            Multilog.this.ignoreMalformedLogs = z;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public Multilog<T> m3doBuild() {
            return Multilog.this;
        }
    }

    private Multilog(Reactor reactor, IFileSystem iFileSystem, FrameFormat frameFormat, BinarySerializer<T> binarySerializer, LogNamingScheme logNamingScheme) {
        super(reactor);
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        this.autoFlushInterval = null;
        this.streamReads = StreamRegistry.create();
        this.streamWrites = StreamRegistry.create();
        this.streamReadStats = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.streamWriteStats = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.fileSystem = iFileSystem;
        this.frameFormat = frameFormat;
        this.serializer = binarySerializer;
        this.namingScheme = logNamingScheme;
    }

    public static <T> Multilog<T> create(Reactor reactor, IFileSystem iFileSystem, FrameFormat frameFormat, BinarySerializer<T> binarySerializer, LogNamingScheme logNamingScheme) {
        return (Multilog) builder(reactor, iFileSystem, frameFormat, binarySerializer, logNamingScheme).build();
    }

    public static <T> Multilog<T>.Builder builder(Reactor reactor, IFileSystem iFileSystem, FrameFormat frameFormat, BinarySerializer<T> binarySerializer, LogNamingScheme logNamingScheme) {
        return new Builder();
    }

    @Override // io.activej.multilog.IMultilog
    public Promise<StreamConsumer<T>> write(String str) {
        Reactive.checkInReactorThread(this);
        validateLogPartition(str);
        return Promise.of(StreamConsumers.ofSupplier(streamSupplier -> {
            return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) streamSupplier.transformWith((StreamSupplierTransformer) ChannelSerializer.builder(this.serializer).withAutoFlushInterval(this.autoFlushInterval).withInitialBufferSize(this.bufferSize).withSkipSerializationErrors().build())).transformWith(this.streamWrites.register(str))).transformWith(this.streamWriteStats)).bindTo(new LogStreamChunker(this.reactor, this.fileSystem, this.namingScheme, str, channelConsumer -> {
                return (ChannelConsumer) channelConsumer.transformWith((ChannelConsumerTransformer) ChannelFrameEncoder.builder(this.frameFormat).withEncoderResets().build());
            }));
        }).withAcknowledgement(promise -> {
            return promise.mapException(exc -> {
                return new MultilogException("Failed to write logs to partition '" + str + "'", exc);
            });
        }));
    }

    @Override // io.activej.multilog.IMultilog
    public Promise<StreamSupplierWithResult<T, LogPosition>> read(String str, LogFile logFile, long j, @Nullable LogFile logFile2) {
        Reactive.checkInReactorThread(this);
        validateLogPartition(str);
        LogPosition create = LogPosition.create(logFile, j);
        return this.fileSystem.list(this.namingScheme.getListGlob(str)).map(map -> {
            Stream stream = map.keySet().stream();
            LogNamingScheme logNamingScheme = this.namingScheme;
            Objects.requireNonNull(logNamingScheme);
            return (List) stream.map(logNamingScheme::parse).filter((v0) -> {
                return Objects.nonNull(v0);
            }).filter(partitionAndFile -> {
                return partitionAndFile.logPartition().equals(str);
            }).map((v0) -> {
                return v0.logFile();
            }).collect(Collectors.toList());
        }).map(list -> {
            RefBoolean refBoolean = new RefBoolean(true);
            return readLogFiles(str, create, (List) list.stream().filter(logFile3 -> {
                if (logFile3.compareTo(create.getLogFile()) < 0) {
                    return false;
                }
                if (logFile2 == null || logFile3.compareTo(logFile2) <= 0) {
                    return true;
                }
                refBoolean.set(false);
                return false;
            }).map(logFile4 -> {
                return logFile4.equals(create.getLogFile()) ? create : LogPosition.create(logFile4, 0L);
            }).sorted().collect(Collectors.toList()), refBoolean.get());
        }).mapException(exc -> {
            return new MultilogException("Failed to read logs from partition '" + str + "'", exc);
        });
    }

    private StreamSupplierWithResult<T, LogPosition> readLogFiles(final String str, final LogPosition logPosition, final List<LogPosition> list, final boolean z) {
        final SettablePromise settablePromise = new SettablePromise();
        return StreamSupplierWithResult.of(StreamSuppliers.concat(new Iterator<StreamSupplier<T>>() { // from class: io.activej.multilog.Multilog.1
            final Stopwatch sw = Stopwatch.createUnstarted();
            final Iterator<LogPosition> it;
            final CountingFrameFormat countingFormat;
            LogPosition currentPosition;

            {
                this.it = list.iterator();
                this.countingFormat = new CountingFrameFormat(Multilog.this.frameFormat);
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.it.hasNext()) {
                    return true;
                }
                settablePromise.trySet(getLogPosition());
                return false;
            }

            LogPosition getLogPosition() {
                return this.currentPosition == null ? logPosition : LogPosition.create(this.currentPosition.getLogFile(), this.currentPosition.getPosition() + this.countingFormat.getCount());
            }

            @Override // java.util.Iterator
            public StreamSupplier<T> next() {
                this.currentPosition = this.it.next();
                long position = this.currentPosition.getPosition();
                LogFile logFile = this.currentPosition.getLogFile();
                if (Multilog.logger.isTraceEnabled()) {
                    Multilog.logger.trace("Read log file `{}` from: {}", logFile, Long.valueOf(position));
                }
                Promise download = Multilog.this.fileSystem.download(Multilog.this.namingScheme.path(str, logFile), position, Long.MAX_VALUE);
                AsyncFunctionEx asyncFunctionEx = (v0) -> {
                    return Promise.of(v0);
                };
                String str2 = str;
                Promise then = download.then(asyncFunctionEx, exc -> {
                    if (!Multilog.this.ignoreMalformedLogs || !(exc instanceof IllegalOffsetException)) {
                        return Promise.ofException(exc);
                    }
                    if (Multilog.logger.isWarnEnabled()) {
                        Multilog.logger.warn("Ignoring log file whose size is less than log position {} {}:`{}` in {}, previous position: {}", new Object[]{Long.valueOf(position), Multilog.this.fileSystem, Multilog.this.namingScheme.path(str2, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.countingFormat.getCount()), exc});
                    }
                    return Promise.of(ChannelSuppliers.empty());
                });
                String str3 = str;
                boolean z2 = z;
                return StreamSuppliers.ofPromise(then.map(channelSupplier -> {
                    this.countingFormat.resetCount();
                    this.sw.reset().start();
                    return ((StreamSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) channelSupplier.transformWith(Multilog.this.streamReads.register(str3 + ":" + logFile + "@" + position))).transformWith(Multilog.this.streamReadStats)).transformWith((ChannelSupplierTransformer) ChannelFrameDecoder.builder(this.countingFormat).withDecoderResets().build())).transformWith(channelSupplier -> {
                        return channelSupplier.withEndOfStream(promise -> {
                            return promise.map(FunctionEx.identity(), exc2 -> {
                                if ((exc2 instanceof TruncatedDataException) && !this.it.hasNext() && z2) {
                                    return null;
                                }
                                if (!Multilog.this.ignoreMalformedLogs || !(exc2 instanceof MalformedDataException)) {
                                    throw exc2;
                                }
                                if (!Multilog.logger.isWarnEnabled()) {
                                    return null;
                                }
                                Multilog.logger.warn("Ignoring malformed log file {}:`{}` in {}, previous position: {}", new Object[]{Multilog.this.fileSystem, Multilog.this.namingScheme.path(str3, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.countingFormat.getCount()), exc2});
                                return null;
                            });
                        });
                    })).transformWith(ChannelDeserializer.create(Multilog.this.serializer))).withEndOfStream(promise -> {
                        return promise.whenComplete((r4, exc2) -> {
                            log(exc2);
                        });
                    });
                }));
            }

            private void log(Exception exc) {
                if (exc == null && Multilog.logger.isTraceEnabled()) {
                    Multilog.logger.trace("Finish log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{Multilog.this.fileSystem, Multilog.this.namingScheme.path(str, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.countingFormat.getCount()), Long.valueOf(this.countingFormat.getCount() / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L))});
                } else {
                    if (exc == null || !Multilog.logger.isErrorEnabled()) {
                        return;
                    }
                    Multilog.logger.error("Error on log file {}:`{}` in {}, compressed bytes: {} ({} bytes/s)", new Object[]{Multilog.this.fileSystem, Multilog.this.namingScheme.path(str, this.currentPosition.getLogFile()), this.sw, Long.valueOf(this.countingFormat.getCount()), Long.valueOf(this.countingFormat.getCount() / Math.max(this.sw.elapsed(TimeUnit.SECONDS), 1L)), exc});
                }
            }
        }), settablePromise);
    }

    private static void validateLogPartition(String str) {
        Checks.checkArgument(!str.contains("-"), "Using dash (-) in log partition name is not allowed");
    }

    @JmxAttribute
    public boolean isIgnoreMalformedLogs() {
        return this.ignoreMalformedLogs;
    }

    @JmxAttribute
    public MemSize getBufferSize() {
        return this.bufferSize;
    }

    @JmxAttribute
    public Duration getAutoFlushInterval() {
        return this.autoFlushInterval;
    }

    @JmxAttribute
    public StreamRegistry getStreamReads() {
        return this.streamReads;
    }

    @JmxAttribute
    public StreamRegistry getStreamWrites() {
        return this.streamWrites;
    }

    @JmxAttribute
    public DetailedStreamStats getStreamReadStats() {
        return this.streamReadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getStreamWriteStats() {
        return this.streamWriteStats;
    }
}
