package io.scalecube.services.transport.rsocket.server;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.HeadAndTail;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.exceptions.ExceptionProcessor;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/server/RSocketServiceAcceptor.class */
public class RSocketServiceAcceptor implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceAcceptor.class);
    private final ServiceMessageCodec messageCodec;
    private final ServiceMethodRegistry methodRegistry;

    public RSocketServiceAcceptor(ServiceMessageCodec serviceMessageCodec, ServiceMethodRegistry serviceMethodRegistry) {
        this.messageCodec = serviceMessageCodec;
        this.methodRegistry = serviceMethodRegistry;
    }

    @Override // io.rsocket.SocketAcceptor
    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        LOGGER.info("Accepted rSocket: {}, connectionSetup: {}", rSocket, connectionSetupPayload);
        return Mono.just(new AbstractRSocket() { // from class: io.scalecube.services.transport.rsocket.server.RSocketServiceAcceptor.1
            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Mono<Payload> requestResponse(Payload payload) {
                return Mono.just(payload).map(this::toMessage).doOnNext(this::checkMethodInvokerExist).flatMap(serviceMessage -> {
                    return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeOne(serviceMessage, ServiceMessageCodec::decodeData);
                }).onErrorResume(th -> {
                    return Mono.just(ExceptionProcessor.toMessage(th));
                }).map(this::toPayload);
            }

            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Flux<Payload> requestStream(Payload payload) {
                return Flux.just(payload).map(this::toMessage).doOnNext(this::checkMethodInvokerExist).flatMap(serviceMessage -> {
                    return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeMany(serviceMessage, ServiceMessageCodec::decodeData);
                }).onErrorResume(th -> {
                    return Flux.just(ExceptionProcessor.toMessage(th));
                }).map(this::toPayload);
            }

            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                return Flux.from(HeadAndTail.createFrom(Flux.from(publisher).map(this::toMessage))).flatMap(headAndTail -> {
                    ServiceMessage serviceMessage = (ServiceMessage) headAndTail.head();
                    checkMethodInvokerExist(serviceMessage);
                    return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeBidirectional(Flux.from(headAndTail.tail()).startWith(serviceMessage), ServiceMessageCodec::decodeData);
                }).onErrorResume(th -> {
                    return Flux.just(ExceptionProcessor.toMessage(th));
                }).map(this::toPayload);
            }

            private Payload toPayload(ServiceMessage serviceMessage) {
                return (Payload) RSocketServiceAcceptor.this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
            }

            private ServiceMessage toMessage(Payload payload) {
                return RSocketServiceAcceptor.this.messageCodec.decode(payload.sliceData(), payload.sliceMetadata());
            }

            private void checkMethodInvokerExist(ServiceMessage serviceMessage) {
                if (RSocketServiceAcceptor.this.methodRegistry.containsInvoker(serviceMessage.qualifier())) {
                    return;
                }
                RSocketServiceAcceptor.LOGGER.error("Failed to invoke service with args[{}], No service invoker found by qualifier: {}", serviceMessage, serviceMessage.qualifier());
                throw new ServiceUnavailableException("No service invoker registered at service method registry by qualifier: " + serviceMessage.qualifier());
            }
        });
    }
}
