/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.net;

import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamStatus;

final class SocketStreamConsumer
extends AbstractStreamConsumer<ByteBuf>
implements StreamDataReceiver<ByteBuf> {
    private final AsyncTcpSocket asyncTcpSocket;
    private final SettableStage<Void> sentStage;
    private int writeLoop;
    private boolean sent;

    private SocketStreamConsumer(AsyncTcpSocket asyncTcpSocket, SettableStage<Void> sentStage) {
        this.asyncTcpSocket = asyncTcpSocket;
        this.sentStage = sentStage;
    }

    public static SocketStreamConsumer create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamConsumer(asyncTcpSocket, (SettableStage<Void>)SettableStage.create());
    }

    @Override
    protected void onStarted() {
        this.getProducer().produce(this);
    }

    @Override
    public void onEndOfStream() {
        this.asyncTcpSocket.writeEndOfStream();
    }

    @Override
    protected void onError(Throwable t) {
        this.sentStage.setException(t);
    }

    @Override
    public void onData(ByteBuf buf) {
        if (this.getStatus().isClosed()) {
            buf.recycle();
            return;
        }
        this.asyncTcpSocket.write(buf);
        int loop = this.eventloop.getLoop();
        if (this.writeLoop == 0) {
            this.writeLoop = loop;
            return;
        }
        if (loop != this.writeLoop) {
            this.writeLoop = loop;
            this.getProducer().suspend();
        }
    }

    public void onWrite() {
        this.writeLoop = 0;
        if (this.getStatus().isOpen()) {
            this.getProducer().produce(this);
        } else if (this.getStatus() == StreamStatus.END_OF_STREAM) {
            this.sent = true;
            this.sentStage.set(null);
        }
    }

    public boolean isClosed() {
        return !this.isWired() || this.sent || this.getStatus() == StreamStatus.CLOSED_WITH_ERROR;
    }

    public Stage<Void> getSentStage() {
        return this.sentStage;
    }
}

