package io.activej.multilog;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.fs.ActiveFs;
import io.activej.promise.Promise;
import java.util.function.UnaryOperator;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/multilog/LogStreamChunker.class */
final class LogStreamChunker extends AbstractCommunicatingProcess implements ChannelInput<ByteBuf> {
    private final CurrentTimeProvider currentTimeProvider;
    private final ActiveFs fs;
    private final LogNamingScheme namingScheme;
    private final String logPartition;
    private final UnaryOperator<ChannelConsumer<ByteBuf>> consumerTransformer;
    private ChannelSupplier<ByteBuf> input;

    @Nullable
    private ChannelConsumer<ByteBuf> currentConsumer;
    private LogFile currentChunk;

    public LogStreamChunker(CurrentTimeProvider currentTimeProvider, ActiveFs activeFs, LogNamingScheme logNamingScheme, String str, UnaryOperator<ChannelConsumer<ByteBuf>> unaryOperator) {
        this.currentTimeProvider = currentTimeProvider;
        this.fs = activeFs;
        this.namingScheme = logNamingScheme;
        this.logPartition = str;
        this.consumerTransformer = unaryOperator;
    }

    public Promise<Void> set(ChannelSupplier<ByteBuf> channelSupplier) {
        this.input = sanitize(channelSupplier);
        startProcess();
        return getProcessCompletion();
    }

    protected void doProcess() {
        this.input.get().whenResult(byteBuf -> {
            if (byteBuf != null) {
                ensureConsumer().then(() -> {
                    return this.currentConsumer.accept(byteBuf);
                }).whenResult(this::doProcess);
            } else {
                flush().whenResult(() -> {
                    this.completeProcess();
                });
            }
        });
    }

    private Promise<Void> ensureConsumer() {
        LogFile format = this.namingScheme.format(this.currentTimeProvider.currentTimeMillis());
        return (this.currentChunk == null || this.currentChunk.getName().compareTo(format.getName()) < 0) ? startNewChunk(format) : Promise.complete();
    }

    private Promise<Void> startNewChunk(LogFile logFile) {
        return flush().then(() -> {
            this.currentChunk = this.currentChunk == null ? logFile : new LogFile(logFile.getName(), 0);
            return this.fs.append(this.namingScheme.path(this.logPartition, this.currentChunk), 0L).then((obj, exc) -> {
                return this.doSanitize(obj, exc);
            }).whenResult(channelConsumer -> {
                this.currentConsumer = (ChannelConsumer) this.consumerTransformer.apply(sanitize(channelConsumer));
            }).toVoid();
        });
    }

    private Promise<Void> flush() {
        return this.currentConsumer == null ? Promise.complete() : this.currentConsumer.acceptEndOfStream().whenResult(() -> {
            this.currentConsumer = null;
        });
    }

    protected void doClose(Exception exc) {
        this.input.closeEx(exc);
        if (this.currentConsumer != null) {
            this.currentConsumer.closeEx(exc);
        }
    }
}
