/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.net.SocketStreamConsumer;
import io.datakernel.stream.net.SocketStreamProducer;
import io.datakernel.stream.net.SocketStreaming;

public final class SocketStreamingConnection
implements AsyncTcpSocket.EventHandler,
SocketStreaming {
    private final AsyncTcpSocket asyncTcpSocket;
    private final SocketStreamProducer socketReader;
    private final SocketStreamConsumer socketWriter;

    private SocketStreamingConnection(AsyncTcpSocket asyncTcpSocket) {
        this.asyncTcpSocket = asyncTcpSocket;
        this.socketWriter = SocketStreamConsumer.create(asyncTcpSocket);
        this.socketWriter.getSentStage().whenComplete(($, throwable) -> {
            if (throwable != null) {
                this.socketReader.closeWithError(throwable);
                asyncTcpSocket.close();
            }
        });
        this.socketReader = SocketStreamProducer.create(asyncTcpSocket);
        this.socketReader.getEndOfStream().whenComplete(($, throwable) -> {
            if (throwable != null) {
                this.socketWriter.closeWithError(throwable);
                asyncTcpSocket.close();
            }
        });
    }

    public static SocketStreamingConnection create(AsyncTcpSocket asyncTcpSocket) {
        return new SocketStreamingConnection(asyncTcpSocket);
    }

    @Override
    public StreamConsumer<ByteBuf> getSocketWriter() {
        return this.socketWriter;
    }

    @Override
    public StreamProducer<ByteBuf> getSocketReader() {
        return this.socketReader;
    }

    public void onRegistered() {
    }

    public void onRead(ByteBuf buf) {
        this.socketReader.onRead(buf);
        this.closeIfDone();
    }

    public void onReadEndOfStream() {
        this.socketReader.onReadEndOfStream();
        this.closeIfDone();
    }

    public void onWrite() {
        this.socketWriter.onWrite();
        this.closeIfDone();
    }

    private void closeIfDone() {
        if (this.socketReader.isClosed() && this.socketWriter.isClosed()) {
            this.asyncTcpSocket.close();
        }
    }

    public void onClosedWithError(Exception e) {
        this.socketReader.closeWithError(e);
        this.socketWriter.closeWithError(e);
    }

    public String toString() {
        return "{asyncTcpSocket=" + this.asyncTcpSocket + '}';
    }
}

