package io.vlingo.wire.fdx.inbound.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.vlingo.actors.Logger;
import io.vlingo.wire.channel.ChannelMessageDispatcher;
import io.vlingo.wire.channel.ChannelReader;
import io.vlingo.wire.channel.ChannelReaderConsumer;
import io.vlingo.wire.message.RawMessageBuilder;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/vlingo/wire/fdx/inbound/rsocket/RSocketChannelInboundReader.class */
public class RSocketChannelInboundReader implements ChannelReader, ChannelMessageDispatcher {
    private final Logger logger;
    private final String name;
    private final int port;
    private boolean closed = false;
    private final int maxMessageSize;
    private CloseableChannel serverSocket;
    private ChannelReaderConsumer consumer;

    /* loaded from: input_file:io/vlingo/wire/fdx/inbound/rsocket/RSocketChannelInboundReader$SocketAcceptorImpl.class */
    private static class SocketAcceptorImpl implements SocketAcceptor {
        private final RSocket acceptor;

        private SocketAcceptorImpl(final ChannelMessageDispatcher channelMessageDispatcher, final String str, int i, final Logger logger) {
            final RawMessageBuilder rawMessageBuilder = new RawMessageBuilder(i);
            this.acceptor = new AbstractRSocket() { // from class: io.vlingo.wire.fdx.inbound.rsocket.RSocketChannelInboundReader.SocketAcceptorImpl.1
                public Mono<Void> fireAndForget(Payload payload) {
                    try {
                        rawMessageBuilder.workBuffer().put(payload.getData());
                        channelMessageDispatcher.dispatchMessagesFor(rawMessageBuilder);
                    } catch (Throwable th) {
                        logger.error("Unexpected error in inbound channel {}. Message ignored.", new Object[]{str, th});
                        rawMessageBuilder.prepareForNextMessage();
                        rawMessageBuilder.workBuffer().clear();
                    } finally {
                        payload.release();
                    }
                    return Mono.empty();
                }
            };
        }

        public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
            return Mono.just(this.acceptor);
        }
    }

    public RSocketChannelInboundReader(int i, String str, int i2, Logger logger) {
        this.logger = logger;
        this.name = str;
        this.port = i;
        this.maxMessageSize = i2;
    }

    @Override // io.vlingo.wire.channel.ChannelMessageDispatcher
    public ChannelReaderConsumer consumer() {
        return this.consumer;
    }

    @Override // io.vlingo.wire.channel.ChannelMessageDispatcher
    public Logger logger() {
        return this.logger;
    }

    @Override // io.vlingo.wire.channel.ChannelReader
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (this.serverSocket == null || this.serverSocket.isDisposed()) {
            return;
        }
        try {
            this.serverSocket.dispose();
        } catch (Throwable th) {
            this.logger.error("Unexpected error on closing inbound channel {}", new Object[]{this.name, th});
        }
    }

    @Override // io.vlingo.wire.channel.ChannelReader, io.vlingo.wire.channel.ChannelMessageDispatcher
    public String name() {
        return this.name;
    }

    @Override // io.vlingo.wire.channel.ChannelReader
    public void openFor(ChannelReaderConsumer channelReaderConsumer) {
        if (this.closed) {
            return;
        }
        this.consumer = channelReaderConsumer;
        if (this.serverSocket != null && !this.serverSocket.isDisposed()) {
            this.serverSocket.dispose();
        }
        this.serverSocket = (CloseableChannel) RSocketFactory.receive().frameDecoder(PayloadDecoder.ZERO_COPY).acceptor(new SocketAcceptorImpl(this, this.name, this.maxMessageSize, this.logger)).transport(TcpServerTransport.create(this.port)).start().doOnError(th -> {
            this.logger.error("Failed to create RSocket inbound channel {} at port {}", new Object[]{this.name, Integer.valueOf(this.port), th});
        }).block();
        if (this.serverSocket != null) {
            this.serverSocket.onClose().doFinally(signalType -> {
                this.logger.info("RSocket inbound channel {} at port {} is closed", new Object[]{this.name, Integer.valueOf(this.port)});
            }).subscribe(r1 -> {
            }, th2 -> {
                this.logger.error("Unexpected error on closing inbound channel {}", new Object[]{this.name, th2});
            });
            logger().info("RSocket inbound channel {} opened at port {}", new Object[]{this.name, Integer.valueOf(this.port)});
        }
    }

    @Override // io.vlingo.wire.channel.ChannelReader
    public void probeChannel() {
    }
}
