/*
 * Decompiled with CFR 0.152.
 */
package io.activej.multilog;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.time.CurrentTimeProvider;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.fs.ActiveFs;
import io.activej.multilog.LogFile;
import io.activej.multilog.LogNamingScheme;
import io.activej.promise.Promise;
import org.jetbrains.annotations.Nullable;

final class LogStreamChunker
extends AbstractCommunicatingProcess
implements ChannelInput<ByteBuf> {
    private final CurrentTimeProvider currentTimeProvider;
    private final ActiveFs fs;
    private final LogNamingScheme namingScheme;
    private final String logPartition;
    private ChannelSupplier<ByteBuf> input;
    @Nullable
    private ChannelConsumer<ByteBuf> currentConsumer;
    private LogFile currentChunk;

    public LogStreamChunker(CurrentTimeProvider currentTimeProvider, ActiveFs fs, LogNamingScheme namingScheme, String logPartition) {
        this.currentTimeProvider = currentTimeProvider;
        this.fs = fs;
        this.namingScheme = namingScheme;
        this.logPartition = logPartition;
    }

    public Promise<Void> set(ChannelSupplier<ByteBuf> input) {
        this.input = this.sanitize(input);
        this.startProcess();
        return this.getProcessCompletion();
    }

    protected void doProcess() {
        this.input.get().whenResult(buf -> {
            if (buf != null) {
                this.ensureConsumer().then(() -> this.currentConsumer.accept(buf)).whenResult(this::doProcess);
            } else {
                this.flush().whenResult(() -> ((LogStreamChunker)this).completeProcess());
            }
        });
    }

    private Promise<Void> ensureConsumer() {
        LogFile newChunkName = this.namingScheme.format(this.currentTimeProvider.currentTimeMillis());
        return this.currentChunk != null && this.currentChunk.getName().compareTo(newChunkName.getName()) >= 0 ? Promise.complete() : this.startNewChunk(newChunkName);
    }

    private Promise<Void> startNewChunk(LogFile newChunkName) {
        return this.flush().then(() -> {
            this.currentChunk = this.currentChunk == null ? newChunkName : new LogFile(newChunkName.getName(), 0);
            return this.fs.append(this.namingScheme.path(this.logPartition, this.currentChunk), 0L).thenEx((arg_0, arg_1) -> ((LogStreamChunker)this).sanitize(arg_0, arg_1)).whenResult(newConsumer -> {
                this.currentConsumer = this.sanitize((ChannelConsumer)newConsumer);
            }).toVoid();
        });
    }

    private Promise<Void> flush() {
        if (this.currentConsumer == null) {
            return Promise.complete();
        }
        return this.currentConsumer.acceptEndOfStream().whenResult(() -> {
            this.currentConsumer = null;
        });
    }

    protected void doClose(Throwable e) {
        this.input.closeEx(e);
        if (this.currentConsumer != null) {
            this.currentConsumer.closeEx(e);
        }
    }
}

