package io.datakernel.rpc.protocol;

import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.DataStreams;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.net.SocketStreamingConnection;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamLZ4Compressor;
import io.datakernel.stream.processor.StreamLZ4Decompressor;
import io.datakernel.util.MemSize;
import java.time.Duration;

/* loaded from: input_file:io/datakernel/rpc/protocol/RpcStream.class */
public final class RpcStream {
    private Listener listener;
    private final AbstractStreamProducer<RpcMessage> sender;
    private final AbstractStreamConsumer<RpcMessage> receiver;
    private final SocketStreamingConnection connection;
    private boolean ready;
    private StreamDataReceiver<RpcMessage> downstreamDataReceiver;

    /* loaded from: input_file:io/datakernel/rpc/protocol/RpcStream$Listener.class */
    public interface Listener extends StreamDataReceiver<RpcMessage> {
        void onClosedWithError(Throwable th);

        void onReadEndOfStream();
    }

    public RpcStream(AsyncTcpSocket asyncTcpSocket, BufferSerializer<RpcMessage> bufferSerializer, MemSize memSize, MemSize memSize2, Duration duration, boolean z, boolean z2) {
        this.connection = SocketStreamingConnection.create(asyncTcpSocket);
        if (z2) {
            this.sender = new AbstractStreamProducer<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.1
                protected void onProduce(StreamDataReceiver<RpcMessage> streamDataReceiver) {
                    RpcStream.this.downstreamDataReceiver = streamDataReceiver;
                    RpcStream.this.receiver.getProducer().produce(RpcStream.this.listener);
                    RpcStream.this.ready = true;
                }

                protected void onSuspended() {
                    RpcStream.this.receiver.getProducer().suspend();
                    RpcStream.this.ready = false;
                }

                protected void onError(Throwable th) {
                    RpcStream.this.listener.onClosedWithError(th);
                    RpcStream.this.ready = false;
                }
            };
        } else {
            this.sender = new AbstractStreamProducer<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.2
                protected void onProduce(StreamDataReceiver<RpcMessage> streamDataReceiver) {
                    RpcStream.this.downstreamDataReceiver = streamDataReceiver;
                    RpcStream.this.ready = true;
                }

                protected void onSuspended() {
                    RpcStream.this.ready = false;
                }

                protected void onError(Throwable th) {
                    RpcStream.this.listener.onClosedWithError(th);
                    RpcStream.this.ready = false;
                }
            };
        }
        this.receiver = new AbstractStreamConsumer<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.3
            protected void onStarted() {
                getProducer().produce(RpcStream.this.listener);
            }

            protected void onEndOfStream() {
                RpcStream.this.listener.onReadEndOfStream();
            }

            protected void onError(Throwable th) {
            }
        };
        StreamBinarySerializer withSkipSerializationErrors = StreamBinarySerializer.create(bufferSerializer).withInitialBufferSize(memSize).withMaxMessageSize(memSize2).withAutoFlushInterval(duration).withSkipSerializationErrors();
        StreamBinaryDeserializer create = StreamBinaryDeserializer.create(bufferSerializer);
        if (z) {
            StreamLZ4Compressor fastCompressor = StreamLZ4Compressor.fastCompressor();
            StreamLZ4Decompressor create2 = StreamLZ4Decompressor.create();
            DataStreams.stream(this.connection.getSocketReader(), create2.getInput());
            DataStreams.stream(create2.getOutput(), create.getInput());
            DataStreams.stream(withSkipSerializationErrors.getOutput(), fastCompressor.getInput());
            DataStreams.stream(fastCompressor.getOutput(), this.connection.getSocketWriter());
        } else {
            DataStreams.stream(this.connection.getSocketReader(), create.getInput());
            DataStreams.stream(withSkipSerializationErrors.getOutput(), this.connection.getSocketWriter());
        }
        DataStreams.stream(create.getOutput(), this.receiver);
        DataStreams.stream(this.sender, withSkipSerializationErrors.getInput());
    }

    public void setListener(Listener listener) {
        this.listener = listener;
    }

    public void sendMessage(RpcMessage rpcMessage) {
        sendRpcMessage(rpcMessage);
    }

    public void sendCloseMessage() {
        sendRpcMessage(RpcMessage.of(-1, RpcControlMessage.CLOSE));
    }

    private void sendRpcMessage(RpcMessage rpcMessage) {
        if (this.ready) {
            this.downstreamDataReceiver.onData(rpcMessage);
        }
    }

    public boolean isOverloaded() {
        return !this.ready;
    }

    public AsyncTcpSocket.EventHandler getSocketEventHandler() {
        return this.connection;
    }

    public void sendEndOfStream() {
        this.sender.sendEndOfStream();
    }
}
