/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.processor;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.util.MemSize;

public final class StreamByteChunker
implements StreamTransformer<ByteBuf, ByteBuf> {
    private final Input input = new Input();
    private final Output output;

    private StreamByteChunker(MemSize minChunkSize, MemSize maxChunkSize) {
        this.output = new Output(minChunkSize, maxChunkSize);
    }

    public static StreamByteChunker create(MemSize minChunkSize, MemSize maxChunkSize) {
        return new StreamByteChunker(minChunkSize, maxChunkSize);
    }

    @Override
    public StreamConsumer<ByteBuf> getInput() {
        return this.input;
    }

    @Override
    public StreamProducer<ByteBuf> getOutput() {
        return this.output;
    }

    protected final class Output
    extends AbstractStreamProducer<ByteBuf>
    implements StreamDataReceiver<ByteBuf> {
        private final int minChunkSize;
        private final int maxChunkSize;
        private ByteBuf internalBuf = ByteBuf.empty();

        public Output(MemSize minChunkSize, MemSize maxChunkSize) {
            this.minChunkSize = minChunkSize.toInt();
            this.maxChunkSize = maxChunkSize.toInt();
        }

        @Override
        protected void produce() {
            this.tryFlushAndClose();
            if (this.isReceiverReady()) {
                StreamByteChunker.this.input.getProducer().produce(this);
            }
        }

        @Override
        protected void onSuspended() {
            StreamByteChunker.this.input.getProducer().suspend();
        }

        @Override
        public void onData(ByteBuf buf) {
            if (!this.internalBuf.canRead()) {
                while (this.isReceiverReady() && buf.readRemaining() >= this.minChunkSize) {
                    int chunkSize = Math.min(buf.readRemaining(), this.maxChunkSize);
                    ByteBuf slice = buf.slice(chunkSize);
                    this.send(slice);
                    buf.moveReadPosition(chunkSize);
                }
            }
            if (buf.canRead()) {
                this.internalBuf = ByteBufPool.ensureWriteRemaining((ByteBuf)this.internalBuf, (int)Math.max(this.maxChunkSize - this.internalBuf.writePosition(), buf.readRemaining()));
                this.internalBuf.put(buf);
            }
            buf.recycle();
            this.tryFlushAndClose();
        }

        private void tryFlushAndClose() {
            while (this.isReceiverReady() && this.internalBuf.readRemaining() >= this.minChunkSize) {
                int chunkSize = Math.min(this.internalBuf.readRemaining(), this.maxChunkSize);
                assert (chunkSize >= this.minChunkSize && chunkSize <= this.maxChunkSize);
                ByteBuf slice = this.internalBuf.slice(this.internalBuf.readPosition(), chunkSize);
                this.send(slice);
                this.internalBuf.moveReadPosition(chunkSize);
            }
            if (!this.isReceiverReady()) {
                return;
            }
            if (StreamByteChunker.this.input.getStatus() == StreamStatus.END_OF_STREAM) {
                if (this.internalBuf.canRead()) {
                    StreamByteChunker.this.output.send(this.internalBuf);
                    this.internalBuf = null;
                }
                StreamByteChunker.this.output.sendEndOfStream();
            }
        }

        @Override
        protected void onError(Throwable t) {
            StreamByteChunker.this.input.closeWithError(t);
        }

        @Override
        protected void cleanup() {
            if (this.internalBuf != null) {
                this.internalBuf.recycle();
                this.internalBuf = null;
            }
        }
    }

    protected final class Input
    extends AbstractStreamConsumer<ByteBuf> {
        protected Input() {
        }

        @Override
        protected void onEndOfStream() {
            StreamByteChunker.this.output.tryFlushAndClose();
        }

        @Override
        protected void onError(Throwable t) {
            StreamByteChunker.this.output.closeWithError(t);
        }
    }
}

