package io.datakernel.rpc.protocol;

import io.datakernel.common.MemSize;
import io.datakernel.common.exception.CloseException;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.process.ChannelLZ4Compressor;
import io.datakernel.csp.process.ChannelLZ4Decompressor;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.AbstractStreamSupplier;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.net.AsyncTcpSocket;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.time.Duration;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/datakernel/rpc/protocol/RpcStream.class */
public final class RpcStream {
    private static final CloseException RPC_CLOSE_EXCEPTION = new CloseException(RpcStream.class, "RPC Channel Closed");
    private final boolean server;
    private final AsyncTcpSocket socket;
    private Listener listener;
    private final AbstractStreamSupplier<RpcMessage> sender;
    private final AbstractStreamConsumer<RpcMessage> receiver;

    /* loaded from: input_file:io/datakernel/rpc/protocol/RpcStream$Listener.class */
    public interface Listener extends StreamDataAcceptor<RpcMessage> {
        void onReceiverEndOfStream();

        void onReceiverError(@NotNull Throwable th);

        void onSenderError(@NotNull Throwable th);

        void onSenderReady(@NotNull StreamDataAcceptor<RpcMessage> streamDataAcceptor);

        void onSenderSuspended();
    }

    public RpcStream(AsyncTcpSocket asyncTcpSocket, BinarySerializer<RpcMessage> binarySerializer, MemSize memSize, MemSize memSize2, Duration duration, boolean z, boolean z2) {
        this.server = z2;
        this.socket = asyncTcpSocket;
        if (this.server) {
            this.sender = new AbstractStreamSupplier<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.1
                protected void onProduce(@NotNull StreamDataAcceptor<RpcMessage> streamDataAcceptor) {
                    RpcStream.this.receiver.getSupplier().resume(RpcStream.this.listener);
                    RpcStream.this.listener.onSenderReady(streamDataAcceptor);
                }

                protected void onSuspended() {
                    RpcStream.this.receiver.getSupplier().suspend();
                    RpcStream.this.listener.onSenderSuspended();
                }

                protected void onError(Throwable th) {
                    if (th != RpcStream.RPC_CLOSE_EXCEPTION) {
                        RpcStream.this.listener.onSenderError(th);
                    }
                }
            };
        } else {
            this.sender = new AbstractStreamSupplier<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.2
                protected void onProduce(@NotNull StreamDataAcceptor<RpcMessage> streamDataAcceptor) {
                    RpcStream.this.listener.onSenderReady(streamDataAcceptor);
                }

                protected void onSuspended() {
                    RpcStream.this.listener.onSenderSuspended();
                }

                protected void onError(Throwable th) {
                    if (th != RpcStream.RPC_CLOSE_EXCEPTION) {
                        RpcStream.this.listener.onSenderError(th);
                    }
                }
            };
        }
        this.receiver = new AbstractStreamConsumer<RpcMessage>() { // from class: io.datakernel.rpc.protocol.RpcStream.3
            protected void onStarted() {
                getSupplier().resume(RpcStream.this.listener);
            }

            protected Promise<Void> onEndOfStream() {
                RpcStream.this.listener.onReceiverEndOfStream();
                return Promise.complete();
            }

            protected void onError(Throwable th) {
                if (th != RpcStream.RPC_CLOSE_EXCEPTION) {
                    RpcStream.this.listener.onReceiverError(th);
                }
            }
        };
        ChannelSerializer withSkipSerializationErrors = ChannelSerializer.create(binarySerializer).withInitialBufferSize(memSize).withMaxMessageSize(memSize2).withAutoFlushInterval(duration).withSkipSerializationErrors();
        ChannelDeserializer create = ChannelDeserializer.create(binarySerializer);
        if (z) {
            ChannelLZ4Decompressor create2 = ChannelLZ4Decompressor.create();
            ChannelLZ4Compressor createFastCompressor = ChannelLZ4Compressor.createFastCompressor();
            ChannelSupplier.ofSocket(asyncTcpSocket).bindTo(create2.getInput());
            create2.getOutput().bindTo(create.getInput());
            withSkipSerializationErrors.getOutput().bindTo(createFastCompressor.getInput());
            createFastCompressor.getOutput().set(ChannelConsumer.ofSocket(asyncTcpSocket));
        } else {
            ChannelSupplier.ofSocket(asyncTcpSocket).bindTo(create.getInput());
            withSkipSerializationErrors.getOutput().set(ChannelConsumer.ofSocket(asyncTcpSocket));
        }
        create.streamTo(this.receiver);
        this.sender.streamTo(withSkipSerializationErrors);
    }

    public void setListener(Listener listener) {
        this.listener = listener;
    }

    public void sendEndOfStream() {
        this.sender.sendEndOfStream();
    }

    public void close() {
        Eventloop.getCurrentEventloop().post(() -> {
            this.socket.close(RPC_CLOSE_EXCEPTION);
        });
    }
}
