package io.vlingo.wire.fdx.bidirectional.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ServerTransport;
import io.vlingo.actors.Actor;
import io.vlingo.actors.Logger;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Completes;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor.class */
public class RSocketServerChannelActor extends Actor implements ServerRequestResponseChannel {
    private final String name;
    private final Closeable serverSocket;
    private final Integer port;

    /* loaded from: input_file:io/vlingo/wire/fdx/bidirectional/rsocket/RSocketServerChannelActor$SocketAcceptorImpl.class */
    private static class SocketAcceptorImpl implements SocketAcceptor {
        private final RSocket acceptor;

        private SocketAcceptorImpl(final RequestChannelConsumerProvider requestChannelConsumerProvider, final int i, final int i2, final Logger logger) {
            this.acceptor = new AbstractRSocket() { // from class: io.vlingo.wire.fdx.bidirectional.rsocket.RSocketServerChannelActor.SocketAcceptorImpl.1
                public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                    RSocketChannelContext rSocketChannelContext = new RSocketChannelContext(requestChannelConsumerProvider, i, i2, logger);
                    Flux subscribeOn = Flux.from(publisher).subscribeOn(Schedulers.single());
                    rSocketChannelContext.getClass();
                    Flux doOnNext = subscribeOn.doOnNext(rSocketChannelContext::consume);
                    Logger logger2 = logger;
                    doOnNext.doOnError(th -> {
                        logger2.error("Unexpected error when consuming channel request", th);
                    }).subscribe();
                    return Flux.from(rSocketChannelContext.processor());
                }
            };
        }

        public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
            return Mono.just(this.acceptor);
        }
    }

    public RSocketServerChannelActor(RequestChannelConsumerProvider requestChannelConsumerProvider, ServerTransport<? extends Closeable> serverTransport, int i, String str, int i2, int i3) {
        this.name = str;
        this.port = Integer.valueOf(i);
        this.serverSocket = (Closeable) RSocketFactory.receive().errorConsumer(th -> {
            logger().error("Unexpected error in server channel", th);
        }).frameDecoder(PayloadDecoder.ZERO_COPY).acceptor(new SocketAcceptorImpl(requestChannelConsumerProvider, i2, i3, logger())).transport(serverTransport).start().block();
        if (this.serverSocket != null) {
            logger().info("RSocket server channel opened at port {}", new Object[]{this.port});
            this.serverSocket.onClose().doFinally(signalType -> {
                logger().info("RSocket server channel closed");
            }).subscribe();
        }
    }

    @Override // io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel
    public void close() {
        if (isStopped()) {
            return;
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.dispose();
            } catch (Exception e) {
                logger().error("Failed to close receive socket for: {}", new Object[]{this.name, e});
            }
        }
        ((Stoppable) selfAs(Stoppable.class)).stop();
    }

    @Override // io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel
    public Completes<Integer> port() {
        return completes().with(this.port);
    }

    public void stop() {
        super.stop();
    }
}
