package io.datakernel.stream.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamProducer;

/* loaded from: input_file:io/datakernel/stream/net/SocketStreamingConnection.class */
public final class SocketStreamingConnection implements AsyncTcpSocket.EventHandler, SocketStreaming {
    private final AsyncTcpSocket asyncTcpSocket;
    private final SocketStreamProducer socketReader;
    private final SocketStreamConsumer socketWriter;

    private SocketStreamingConnection(AsyncTcpSocket asyncTcpSocket) {
        this.asyncTcpSocket = asyncTcpSocket;
        this.socketWriter = SocketStreamConsumer.create(asyncTcpSocket);
        this.socketWriter.getSentStage().whenComplete((r5, th) -> {
            if (th != null) {
                this.socketReader.closeWithError(th);
                asyncTcpSocket.close();
            }
        });
        this.socketReader = SocketStreamProducer.create(asyncTcpSocket);
        this.socketReader.getEndOfStream().whenComplete((r52, th2) -> {
            if (th2 != null) {
                this.socketWriter.closeWithError(th2);
                asyncTcpSocket.close();
            }
        });
    }

    public static SocketStreamingConnection create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamingConnection(asyncTcpSocket);
    }

    @Override // io.datakernel.stream.net.SocketStreaming
    public StreamConsumer<ByteBuf> getSocketWriter() {
        return this.socketWriter;
    }

    @Override // io.datakernel.stream.net.SocketStreaming
    public StreamProducer<ByteBuf> getSocketReader() {
        return this.socketReader;
    }

    public void onRegistered() {
    }

    public void onRead(ByteBuf byteBuf) {
        this.socketReader.onRead(byteBuf);
        closeIfDone();
    }

    public void onReadEndOfStream() {
        this.socketReader.onReadEndOfStream();
        closeIfDone();
    }

    public void onWrite() {
        this.socketWriter.onWrite();
        closeIfDone();
    }

    private void closeIfDone() {
        if (this.socketReader.isClosed() && this.socketWriter.isClosed()) {
            this.asyncTcpSocket.close();
        }
    }

    public void onClosedWithError(Exception exc) {
        this.socketReader.closeWithError(exc);
        this.socketWriter.closeWithError(exc);
    }

    public String toString() {
        return "{asyncTcpSocket=" + this.asyncTcpSocket + '}';
    }
}
