/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.AbstractStreamProducer;

final class SocketStreamProducer
extends AbstractStreamProducer<ByteBuf> {
    private final AsyncTcpSocket asyncTcpSocket;
    protected final ByteBufQueue readQueue = ByteBufQueue.create();
    private boolean readEndOfStream;

    private SocketStreamProducer(AsyncTcpSocket asyncTcpSocket) {
        this.asyncTcpSocket = asyncTcpSocket;
    }

    public static SocketStreamProducer create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamProducer(asyncTcpSocket);
    }

    @Override
    protected void produce() {
        ByteBuf buf;
        while (this.isReceiverReady() && this.readQueue.hasRemaining()) {
            buf = this.readQueue.take();
            this.send(buf);
        }
        if (this.readEndOfStream) {
            if (this.isReceiverReady()) {
                if (this.readQueue.hasRemaining()) {
                    buf = this.readQueue.takeRemaining();
                    this.send(buf);
                }
                this.sendEndOfStream();
            }
        } else if (this.readQueue.remainingBufs() <= 1) {
            this.asyncTcpSocket.read();
        }
    }

    @Override
    protected void onError(Throwable t) {
    }

    public void onRead(ByteBuf buf) {
        this.readQueue.add(buf);
        this.produce();
    }

    public void onReadEndOfStream() {
        this.readEndOfStream = true;
        this.produce();
    }

    public boolean isClosed() {
        return !this.isWired() || this.getStatus().isClosed();
    }
}

