/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.file;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.file.AsyncFile;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamDataReceiver;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamFileWriter
extends AbstractStreamConsumer<ByteBuf>
implements StreamDataReceiver<ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(StreamFileWriter.class);
    public static final OpenOption[] CREATE_OPTIONS = new OpenOption[]{StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING};
    private final Deque<ByteBuf> bufs = new ArrayDeque<ByteBuf>();
    private final SettableStage<Void> flushStage = SettableStage.create();
    private final AsyncFile asyncFile;
    private boolean writing = false;
    private boolean forceOnClose = false;
    private boolean forceMetadata = false;
    private long maxBufferedBytes = 0L;
    private int maxBuffers = 1;
    private long bufferedBytes = 0L;
    private long position = 0L;

    private StreamFileWriter(AsyncFile asyncFile) {
        this.asyncFile = asyncFile;
    }

    public static StreamFileWriter create(ExecutorService executor, Path path) throws IOException {
        return StreamFileWriter.create(AsyncFile.open((ExecutorService)executor, (Path)path, (OpenOption[])CREATE_OPTIONS));
    }

    public static StreamFileWriter create(AsyncFile asyncFile) {
        return new StreamFileWriter(asyncFile);
    }

    public StreamConsumerWithResult<ByteBuf, Void> withFlushAsResult() {
        return this.withResult(this.flushStage);
    }

    public StreamFileWriter withForceOnClose(boolean forceMetadata) {
        this.forceOnClose = true;
        this.forceMetadata = forceMetadata;
        return this;
    }

    public StreamFileWriter withMaxBufferedBytes(long bytes) {
        this.maxBufferedBytes = bytes;
        return this;
    }

    public StreamFileWriter withMaxBuffers(int buffers) {
        this.maxBuffers = buffers;
        return this;
    }

    public Stage<Void> getFlushStage() {
        return this.flushStage;
    }

    private void process() {
        ByteBuf data = this.bufs.poll();
        if (data == null) {
            if (this.getStatus().isClosed()) {
                this.close();
                return;
            }
            this.writing = false;
            this.getProducer().produce(this);
            return;
        }
        int length = data.readRemaining();
        this.asyncFile.writeFully(data, this.position).whenComplete(($, e) -> {
            if (e != null) {
                this.closeWithError(e);
                return;
            }
            this.position += (long)length;
            this.bufferedBytes -= (long)length;
            if (this.bufs.size() <= this.maxBuffers || this.bufferedBytes <= 0L) {
                this.getProducer().produce(this);
            }
            this.process();
        });
    }

    private void step() {
        if (!this.writing) {
            this.writing = true;
            this.process();
        }
    }

    @Override
    public void onData(ByteBuf buf) {
        if (this.getStatus().isClosed()) {
            buf.recycle();
            return;
        }
        this.bufs.offer(buf);
        this.bufferedBytes += (long)buf.readRemaining();
        if (!(this.bufs.isEmpty() || this.bufs.size() <= this.maxBuffers && this.bufferedBytes <= this.maxBufferedBytes)) {
            this.getProducer().suspend();
        }
        this.step();
    }

    private void close() {
        for (ByteBuf buf : this.bufs) {
            buf.recycle();
        }
        this.bufs.clear();
        (this.forceOnClose ? this.asyncFile.forceAndClose(this.forceMetadata) : this.asyncFile.close()).whenComplete((arg_0, arg_1) -> this.flushStage.trySet(arg_0, arg_1)).whenComplete(($, e) -> {
            if (e == null) {
                logger.info(this + ": closed file");
            } else {
                logger.error(this + ": failed to close file", e);
            }
        });
    }

    @Override
    protected void onStarted() {
        this.getProducer().produce(this);
    }

    @Override
    protected void onEndOfStream() {
        this.step();
    }

    @Override
    protected void onError(Throwable t) {
        this.flushStage.setException(t);
        this.close();
    }

    @Override
    public String toString() {
        return "StreamFileWriter{asyncFile=" + this.asyncFile + ", position=" + this.position + (this.writing ? ", writing" : "") + '}';
    }
}

