package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.ServerSetup;
import io.rsocket.exceptions.InvalidSetupException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.lease.Leases;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.resume.SessionManager;
import io.rsocket.transport.ServerTransport;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/rsocket/core/RSocketServer.class */
public final class RSocketServer {
    private static final String SERVER_TAG = "server";
    private Resume resume;
    private SocketAcceptor acceptor = SocketAcceptor.with(new RSocket() { // from class: io.rsocket.core.RSocketServer.1
    });
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Supplier<Leases<?>> leasesSupplier = null;
    private int mtu = 0;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.rsocket.core.RSocketServer$4, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/core/RSocketServer$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.SETUP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.RESUME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private RSocketServer() {
    }

    public static RSocketServer create() {
        return new RSocketServer();
    }

    public static RSocketServer create(SocketAcceptor socketAcceptor) {
        return create().acceptor(socketAcceptor);
    }

    public RSocketServer acceptor(SocketAcceptor socketAcceptor) {
        Objects.requireNonNull(socketAcceptor);
        this.acceptor = socketAcceptor;
        return this;
    }

    public RSocketServer interceptors(Consumer<InterceptorRegistry> consumer) {
        consumer.accept(this.interceptors);
        return this;
    }

    public RSocketServer resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketServer lease(Supplier<Leases<?>> supplier) {
        this.leasesSupplier = supplier;
        return this;
    }

    public RSocketServer fragment(int i) {
        if ((i > 0 && i < 64) || i < 0) {
            throw new IllegalArgumentException(String.format("The smallest allowed mtu size is %d bytes, provided: %d", 64, Integer.valueOf(i)));
        }
        this.mtu = i;
        return this;
    }

    public RSocketServer payloadDecoder(PayloadDecoder payloadDecoder) {
        Objects.requireNonNull(payloadDecoder);
        this.payloadDecoder = payloadDecoder;
        return this;
    }

    public <T extends Closeable> Mono<T> bind(final ServerTransport<T> serverTransport) {
        return Mono.defer(new Supplier<Mono<T>>() { // from class: io.rsocket.core.RSocketServer.2
            ServerSetup serverSetup;

            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            @Override // java.util.function.Supplier
            public Mono<T> get() {
                return serverTransport.start(duplexConnection -> {
                    return RSocketServer.this.acceptor(this.serverSetup, duplexConnection);
                }, RSocketServer.this.mtu).doOnNext(closeable -> {
                    closeable.onClose().doFinally(signalType -> {
                        this.serverSetup.dispose();
                    }).subscribe();
                });
            }
        });
    }

    public <T extends Closeable> T bindNow(ServerTransport<T> serverTransport) {
        return (T) bind(serverTransport).block();
    }

    public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
        return new ServerTransport.ConnectionAcceptor() { // from class: io.rsocket.core.RSocketServer.3
            private final ServerSetup serverSetup;

            {
                this.serverSetup = RSocketServer.this.serverSetup();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.rsocket.transport.ServerTransport.ConnectionAcceptor, java.util.function.Function
            public Mono<Void> apply(DuplexConnection duplexConnection) {
                return RSocketServer.this.acceptor(this.serverSetup, duplexConnection);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection duplexConnection) {
        ClientServerInputMultiplexer clientServerInputMultiplexer = new ClientServerInputMultiplexer(duplexConnection, this.interceptors, false);
        return clientServerInputMultiplexer.asSetupConnection().receive().next().flatMap(byteBuf -> {
            return accept(serverSetup, byteBuf, clientServerInputMultiplexer);
        });
    }

    private Mono<Void> acceptResume(ServerSetup serverSetup, ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer) {
        return serverSetup.acceptRSocketResume(byteBuf, clientServerInputMultiplexer);
    }

    private Mono<Void> accept(ServerSetup serverSetup, ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer) {
        switch (AnonymousClass4.$SwitchMap$io$rsocket$frame$FrameType[FrameHeaderCodec.frameType(byteBuf).ordinal()]) {
            case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                return acceptSetup(serverSetup, byteBuf, clientServerInputMultiplexer);
            case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                return acceptResume(serverSetup, byteBuf, clientServerInputMultiplexer);
            default:
                return serverSetup.sendError(clientServerInputMultiplexer, new InvalidSetupException("invalid setup frame: " + FrameHeaderCodec.frameType(byteBuf))).doFinally(signalType -> {
                    byteBuf.release();
                    clientServerInputMultiplexer.dispose();
                });
        }
    }

    private Mono<Void> acceptSetup(ServerSetup serverSetup, ByteBuf byteBuf, ClientServerInputMultiplexer clientServerInputMultiplexer) {
        if (!SetupFrameCodec.isSupportedVersion(byteBuf)) {
            return serverSetup.sendError(clientServerInputMultiplexer, new InvalidSetupException("Unsupported version: " + SetupFrameCodec.humanReadableVersion(byteBuf))).doFinally(signalType -> {
                byteBuf.release();
                clientServerInputMultiplexer.dispose();
            });
        }
        boolean z = this.leasesSupplier != null;
        return (!SetupFrameCodec.honorLease(byteBuf) || z) ? serverSetup.acceptRSocketSetup(byteBuf, clientServerInputMultiplexer, (keepAliveHandler, clientServerInputMultiplexer2) -> {
            DefaultConnectionSetupPayload defaultConnectionSetupPayload = new DefaultConnectionSetupPayload(byteBuf);
            Leases<?> leases = z ? this.leasesSupplier.get() : null;
            return this.interceptors.initSocketAcceptor(this.acceptor).accept(defaultConnectionSetupPayload, this.interceptors.initRequester(new RSocketRequester(clientServerInputMultiplexer2.asServerConnection(), this.payloadDecoder, StreamIdSupplier.serverSupplier(), this.mtu, defaultConnectionSetupPayload.keepAliveInterval(), defaultConnectionSetupPayload.keepAliveMaxLifetime(), keepAliveHandler, z ? new RequesterLeaseHandler.Impl(SERVER_TAG, leases.receiver()) : RequesterLeaseHandler.None, Schedulers.single(Schedulers.parallel())))).onErrorResume(th -> {
                return serverSetup.sendError(clientServerInputMultiplexer, rejectedSetupError(th)).then(Mono.error(th));
            }).doOnNext(rSocket -> {
                RSocket initResponder = this.interceptors.initResponder(rSocket);
                DuplexConnection asClientConnection = clientServerInputMultiplexer2.asClientConnection();
                new RSocketResponder(asClientConnection, initResponder, this.payloadDecoder, z ? new ResponderLeaseHandler.Impl(SERVER_TAG, asClientConnection.alloc(), leases.sender(), leases.stats()) : ResponderLeaseHandler.None, this.mtu);
            }).doFinally(signalType2 -> {
                defaultConnectionSetupPayload.release();
            }).then();
        }) : serverSetup.sendError(clientServerInputMultiplexer, new InvalidSetupException("lease is not supported")).doFinally(signalType2 -> {
            byteBuf.release();
            clientServerInputMultiplexer.dispose();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ServerSetup serverSetup() {
        return this.resume != null ? createSetup() : new ServerSetup.DefaultServerSetup();
    }

    ServerSetup createSetup() {
        return new ServerSetup.ResumableServerSetup(new SessionManager(), this.resume.getSessionDuration(), this.resume.getStreamTimeout(), this.resume.getStoreFactory(SERVER_TAG), this.resume.isCleanupStoreOnKeepAlive());
    }

    private Exception rejectedSetupError(Throwable th) {
        String message = th.getMessage();
        return new RejectedSetupException(message == null ? "rejected by server acceptor" : message);
    }
}
