package io.datakernel.stream.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.NioEventloop;
import io.datakernel.eventloop.TcpSocketConnection;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumers;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducers;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/net/TcpStreamSocketConnection.class */
public abstract class TcpStreamSocketConnection extends TcpSocketConnection {
    private static final Logger logger;
    public static final int DEFAULT_STREAM_BUFFER_SIZE = 262144;
    protected final Reader socketReader;
    protected final Writer socketWriter;
    private String name;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/datakernel/stream/net/TcpStreamSocketConnection$Reader.class */
    public final class Reader extends AbstractStreamProducer<ByteBuf> {
        public Reader(Eventloop eventloop) {
            super(eventloop);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            TcpStreamSocketConnection.this.readInterest(false);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onResumed() {
            TcpStreamSocketConnection.this.readInterest(true);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onClosed() {
            TcpStreamSocketConnection.this.closeIfDone();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onClosedWithError(Exception exc) {
            TcpStreamSocketConnection.this.onInternalException(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/datakernel/stream/net/TcpStreamSocketConnection$Writer.class */
    public final class Writer extends AbstractStreamConsumer<ByteBuf> implements StreamDataReceiver<ByteBuf> {
        public Writer(Eventloop eventloop) {
            super(eventloop);
        }

        @Override // io.datakernel.stream.StreamConsumer
        public StreamDataReceiver<ByteBuf> getDataReceiver() {
            return this;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void onEndOfStream() {
            if (TcpStreamSocketConnection.this.writeQueue.isEmpty()) {
                closeUpstream();
                TcpStreamSocketConnection.this.closeIfDone();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public void onError(Exception exc) {
            TcpStreamSocketConnection.this.onInternalException(exc);
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(ByteBuf byteBuf) {
            TcpStreamSocketConnection.this.write(byteBuf);
            if (TcpStreamSocketConnection.this.writeQueue.isEmpty()) {
                resumeUpstream();
            } else {
                suspendUpstream();
            }
        }
    }

    public TcpStreamSocketConnection(NioEventloop nioEventloop, SocketChannel socketChannel) {
        super(nioEventloop, socketChannel);
        this.receiveBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
        this.socketReader = new Reader(nioEventloop);
        this.socketWriter = new Writer(nioEventloop);
    }

    public void onRegistered() {
        wire(this.socketReader, this.socketWriter);
        if (this.socketReader.getDownstream() == null) {
            this.socketReader.streamTo(new StreamConsumers.Closing(this.eventloop));
        }
        if (this.socketWriter.getUpstream() == null) {
            new StreamProducers.EndOfStream(this.eventloop).streamTo(this.socketWriter);
        }
    }

    protected abstract void wire(StreamProducer<ByteBuf> streamProducer, StreamConsumer<ByteBuf> streamConsumer);

    protected void onReadEndOfStream() {
        logger.trace("onReadEndOfStream for {}", this);
        this.socketReader.sendEndOfStream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeIfDone() {
        if (isRegistered()) {
            if (this.socketReader.getStatus() >= 3 && this.socketWriter.getUpstreamStatus() >= 3) {
                logger.trace("done, closing {}", this);
                close();
                return;
            }
            if (this.socketReader.getStatus() >= 3) {
                try {
                    this.channel.shutdownInput();
                } catch (IOException e) {
                    logger.error("shutdownInput error {} for {}", e.toString(), this);
                }
            }
            if (this.socketWriter.getUpstreamStatus() >= 3) {
                try {
                    this.channel.shutdownOutput();
                } catch (IOException e2) {
                    logger.error("shutdownOutput error {} for {}", e2.toString(), this);
                }
            }
        }
    }

    protected void onRead(ByteBuf byteBuf) {
        if (!$assertionsDisabled && !this.eventloop.inEventloopThread()) {
            throw new AssertionError();
        }
        try {
            this.socketReader.send(byteBuf);
            onRead();
        } catch (Exception e) {
            onInternalException(e);
        }
    }

    protected void onRead() {
    }

    protected void onWriteFlushed() {
        if (this.socketWriter.getUpstreamStatus() != 2) {
            this.socketWriter.resumeUpstream();
        } else {
            this.socketWriter.closeUpstream();
            closeIfDone();
        }
    }

    protected void onReadException(Exception exc) {
        logger.warn("onReadException", exc);
        this.socketReader.closeWithError(exc);
        this.socketReader.sendError(exc);
    }

    protected void onWriteException(Exception exc) {
        logger.warn("onWriteException", exc);
        this.socketWriter.closeUpstreamWithError(exc);
        closeIfDone();
    }

    public void setName(String str) {
        this.name = str;
    }

    public String toString() {
        return this.name != null ? this.name : super.toString();
    }

    static {
        $assertionsDisabled = !TcpStreamSocketConnection.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(TcpStreamSocketConnection.class);
    }
}
