package io.datakernel.logfs;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.ForwardingStreamConsumer;
import io.datakernel.stream.ForwardingStreamProducer;
import io.datakernel.stream.StreamConsumerSwitcher;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.time.CurrentTimeProvider;
import java.time.Instant;
import java.time.format.DateTimeFormatter;

/* loaded from: input_file:io/datakernel/logfs/LogStreamChunker.class */
public final class LogStreamChunker extends ForwardingStreamConsumer<ByteBuf> implements StreamConsumerWithResult<ByteBuf, Void>, StreamDataReceiver<ByteBuf> {
    private final CurrentTimeProvider currentTimeProvider;
    private final DateTimeFormatter datetimeFormat;
    private final LogFileSystem fileSystem;
    private final String logPartition;
    private final StreamConsumerSwitcher<ByteBuf> switcher;
    private final SettableStage<Void> result;
    private String currentChunkName;
    private StreamDataReceiver<ByteBuf> dataReceiver;
    private StreamConsumerWithResult<ByteBuf, Void> currentConsumer;

    private LogStreamChunker(CurrentTimeProvider currentTimeProvider, LogFileSystem logFileSystem, DateTimeFormatter dateTimeFormatter, String str, StreamConsumerSwitcher<ByteBuf> streamConsumerSwitcher) {
        super(streamConsumerSwitcher);
        this.result = SettableStage.create();
        this.currentTimeProvider = currentTimeProvider;
        this.datetimeFormat = dateTimeFormatter;
        this.fileSystem = logFileSystem;
        this.logPartition = str;
        this.switcher = streamConsumerSwitcher;
        Stage thenCompose = getEndOfStream().thenCompose(r3 -> {
            return this.currentConsumer.getResult();
        });
        SettableStage<Void> settableStage = this.result;
        settableStage.getClass();
        thenCompose.whenComplete((v1, v2) -> {
            r1.trySet(v1, v2);
        });
    }

    public static LogStreamChunker create(LogFileSystem logFileSystem, DateTimeFormatter dateTimeFormatter, String str) {
        return create(Eventloop.getCurrentEventloop(), logFileSystem, dateTimeFormatter, str);
    }

    static LogStreamChunker create(CurrentTimeProvider currentTimeProvider, LogFileSystem logFileSystem, DateTimeFormatter dateTimeFormatter, String str) {
        LogStreamChunker logStreamChunker = new LogStreamChunker(currentTimeProvider, logFileSystem, dateTimeFormatter, str, StreamConsumerSwitcher.create());
        logStreamChunker.startNewChunk(dateTimeFormatter.format(Instant.ofEpochMilli(currentTimeProvider.currentTimeMillis())), Stage.of((Object) null));
        return logStreamChunker;
    }

    public void onData(ByteBuf byteBuf) {
        String format = this.datetimeFormat.format(Instant.ofEpochMilli(this.currentTimeProvider.currentTimeMillis()));
        if (!format.equals(this.currentChunkName)) {
            startNewChunk(format, this.currentConsumer.getResult());
        }
        this.dataReceiver.onData(byteBuf);
    }

    public void setProducer(StreamProducer<ByteBuf> streamProducer) {
        super.setProducer(new ForwardingStreamProducer<ByteBuf>(streamProducer) { // from class: io.datakernel.logfs.LogStreamChunker.1
            public void produce(StreamDataReceiver<ByteBuf> streamDataReceiver) {
                LogStreamChunker.this.dataReceiver = streamDataReceiver;
                super.produce(LogStreamChunker.this);
            }
        });
    }

    private void startNewChunk(String str, Stage<Void> stage) {
        this.currentChunkName = str;
        this.currentConsumer = StreamConsumerWithResult.ofStage(stage.thenCompose(r6 -> {
            return this.fileSystem.makeUniqueLogFile(this.logPartition, str);
        }).thenCompose(logFile -> {
            return this.fileSystem.write(this.logPartition, logFile);
        }));
        this.switcher.switchTo(this.currentConsumer);
    }

    public Stage<Void> getResult() {
        return this.result;
    }
}
