package io.rsocket;

import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.Frame;
import io.rsocket.KeepAliveHandler;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/rsocket/RSocketServer.class */
class RSocketServer implements ResponderRSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final ResponderRSocket responderRSocket;
    private final Function<Frame, ? extends Payload> frameDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final Map<Integer, LimitableRequestPublisher> sendingLimitableSubscriptions;
    private final Map<Integer, Subscription> sendingSubscriptions;
    private final Map<Integer, Processor<Payload, Payload>> channelProcessors;
    private final UnboundedProcessor<Frame> sendProcessor;
    private KeepAliveHandler keepAliveHandler;

    /* renamed from: io.rsocket.RSocketServer$1, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/RSocketServer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$framing$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.REQUEST_FNF.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.CANCEL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.KEEPALIVE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.REQUEST_N.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.METADATA_PUSH.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.PAYLOAD.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.LEASE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.NEXT.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.COMPLETE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.ERROR.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$io$rsocket$framing$FrameType[FrameType.SETUP.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketServer(DuplexConnection duplexConnection, RSocket rSocket, Function<Frame, ? extends Payload> function, Consumer<Throwable> consumer) {
        this(duplexConnection, rSocket, function, consumer, 0L, 0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketServer(DuplexConnection duplexConnection, RSocket rSocket, Function<Frame, ? extends Payload> function, Consumer<Throwable> consumer, long j, long j2) {
        this.requestHandler = rSocket;
        this.responderRSocket = rSocket instanceof ResponderRSocket ? (ResponderRSocket) rSocket : null;
        this.connection = duplexConnection;
        this.frameDecoder = function;
        this.errorConsumer = consumer;
        this.sendingLimitableSubscriptions = Collections.synchronizedMap(new IntObjectHashMap());
        this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap());
        this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap());
        this.sendProcessor = new UnboundedProcessor<>();
        duplexConnection.send(this.sendProcessor.doOnRequest(j3 -> {
            Iterator<LimitableRequestPublisher> it = this.sendingLimitableSubscriptions.values().iterator();
            while (it.hasNext()) {
                it.next().increaseInternalLimit(j3);
            }
        })).doFinally(this::handleSendProcessorCancel).subscribe((Consumer) null, this::handleSendProcessorError);
        Disposable subscribe = duplexConnection.receive().subscribe(this::handleFrame, consumer);
        this.connection.onClose().doFinally(signalType -> {
            cleanup();
            subscribe.dispose();
        }).subscribe((Consumer) null, consumer);
        if (j == 0) {
            this.keepAliveHandler = null;
            return;
        }
        this.keepAliveHandler = KeepAliveHandler.ofServer(new KeepAliveHandler.KeepAlive(j, j2));
        this.keepAliveHandler.timeout().subscribe(keepAlive -> {
            consumer.accept(new ConnectionErrorException(String.format("No keep-alive acks for %d ms", Long.valueOf(keepAlive.getTimeoutMillis()))));
            duplexConnection.dispose();
        });
        Flux<Frame> send = this.keepAliveHandler.send();
        UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        send.subscribe((v1) -> {
            r1.onNext(v1);
        });
    }

    private void handleSendProcessorError(Throwable th) {
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(limitableRequestPublisher -> {
            try {
                limitableRequestPublisher.cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onError(th);
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        });
    }

    private void handleSendProcessorCancel(SignalType signalType) {
        if (SignalType.ON_ERROR == signalType) {
            return;
        }
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(limitableRequestPublisher -> {
            try {
                limitableRequestPublisher.cancel();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onComplete();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.requestHandler.requestChannel(publisher);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.ResponderRSocket
    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        try {
            return this.responderRSocket.requestChannel(payload, publisher);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup() {
        if (this.keepAliveHandler != null) {
            this.keepAliveHandler.dispose();
        }
        cleanUpSendingSubscriptions();
        cleanUpChannelProcessors();
        this.requestHandler.dispose();
        this.sendProcessor.dispose();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingSubscriptions.clear();
        this.sendingLimitableSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingLimitableSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach((v0) -> {
            v0.onComplete();
        });
        this.channelProcessors.clear();
    }

    private void handleFrame(Frame frame) {
        try {
            int streamId = frame.getStreamId();
            switch (AnonymousClass1.$SwitchMap$io$rsocket$framing$FrameType[frame.getType().ordinal()]) {
                case 1:
                    handleFireAndForget(streamId, fireAndForget(this.frameDecoder.apply(frame)));
                    break;
                case 2:
                    handleRequestResponse(streamId, requestResponse(this.frameDecoder.apply(frame)));
                    break;
                case 3:
                    handleCancelFrame(streamId);
                    break;
                case 4:
                    handleKeepAliveFrame(frame);
                    break;
                case 5:
                    handleRequestN(streamId, frame);
                    break;
                case 6:
                    handleStream(streamId, requestStream(this.frameDecoder.apply(frame)), Frame.Request.initialRequestN(frame));
                    break;
                case 7:
                    handleChannel(streamId, this.frameDecoder.apply(frame), Frame.Request.initialRequestN(frame));
                    break;
                case 8:
                    metadataPush(this.frameDecoder.apply(frame));
                    break;
                case 9:
                    break;
                case io.rsocket.framing.Frame.FRAME_TYPE_SHIFT /* 10 */:
                    break;
                case 11:
                    Subscriber subscriber = this.channelProcessors.get(Integer.valueOf(streamId));
                    if (subscriber != null) {
                        subscriber.onNext(this.frameDecoder.apply(frame));
                        break;
                    }
                    break;
                case 12:
                    Subscriber subscriber2 = this.channelProcessors.get(Integer.valueOf(streamId));
                    if (subscriber2 != null) {
                        subscriber2.onComplete();
                        break;
                    }
                    break;
                case 13:
                    Subscriber subscriber3 = this.channelProcessors.get(Integer.valueOf(streamId));
                    if (subscriber3 != null) {
                        subscriber3.onError(new ApplicationErrorException(Frame.Error.message(frame)));
                        break;
                    }
                    break;
                case 14:
                    Subscriber subscriber4 = this.channelProcessors.get(Integer.valueOf(streamId));
                    if (subscriber4 != null) {
                        subscriber4.onNext(this.frameDecoder.apply(frame));
                        subscriber4.onComplete();
                        break;
                    }
                    break;
                case 15:
                    handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    break;
                default:
                    handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + frame.getType()));
                    break;
            }
        } finally {
            frame.release();
        }
    }

    private void handleFireAndForget(int i, Mono<Void> mono) {
        mono.doOnSubscribe(subscription -> {
            this.sendingSubscriptions.put(Integer.valueOf(i), subscription);
        }).doFinally(signalType -> {
            this.sendingSubscriptions.remove(Integer.valueOf(i));
        }).subscribe((Consumer) null, this.errorConsumer);
    }

    private void handleRequestResponse(int i, Mono<Payload> mono) {
        Mono doFinally = mono.doOnSubscribe(subscription -> {
            this.sendingSubscriptions.put(Integer.valueOf(i), subscription);
        }).map(payload -> {
            int i2 = 64;
            if (payload.hasMetadata()) {
                i2 = Frame.setFlag(64, FrameHeaderFlyweight.FLAGS_M);
            }
            Frame from = Frame.PayloadFrame.from(i, FrameType.NEXT_COMPLETE, payload, i2);
            payload.release();
            return from;
        }).switchIfEmpty(Mono.fromCallable(() -> {
            return Frame.PayloadFrame.from(i, FrameType.COMPLETE);
        })).doFinally(signalType -> {
            this.sendingSubscriptions.remove(Integer.valueOf(i));
        });
        UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        doFinally.subscribe((v1) -> {
            r1.onNext(v1);
        }, th -> {
            handleError(i, th);
        });
    }

    private void handleStream(int i, Flux<Payload> flux, int i2) {
        flux.transform(flux2 -> {
            LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux2, this.sendProcessor.available());
            this.sendingLimitableSubscriptions.put(Integer.valueOf(i), wrap);
            wrap.increaseRequestLimit(i2);
            return wrap;
        }).doFinally(signalType -> {
            this.sendingLimitableSubscriptions.remove(Integer.valueOf(i));
        }).subscribe(payload -> {
            Frame from = Frame.PayloadFrame.from(i, FrameType.NEXT, payload);
            payload.release();
            this.sendProcessor.onNext(from);
        }, th -> {
            handleError(i, th);
        }, () -> {
            this.sendProcessor.onNext(Frame.PayloadFrame.from(i, FrameType.COMPLETE));
        });
    }

    private void handleChannel(int i, Payload payload, int i2) {
        Processor<Payload, Payload> create = UnicastProcessor.create();
        this.channelProcessors.put(Integer.valueOf(i), create);
        Flux doFinally = create.doOnCancel(() -> {
            this.sendProcessor.onNext(Frame.Cancel.from(i));
        }).doOnError(th -> {
            this.sendProcessor.onNext(Frame.Error.from(i, th));
        }).doOnRequest(j -> {
            this.sendProcessor.onNext(Frame.RequestN.from(i, j));
        }).doFinally(signalType -> {
            this.channelProcessors.remove(Integer.valueOf(i));
        });
        create.onNext(payload);
        if (this.responderRSocket != null) {
            handleStream(i, requestChannel(payload, doFinally), i2);
        } else {
            handleStream(i, requestChannel(doFinally), i2);
        }
    }

    private void handleKeepAliveFrame(Frame frame) {
        if (this.keepAliveHandler != null) {
            this.keepAliveHandler.receive(frame);
        }
    }

    private void handleCancelFrame(int i) {
        Subscription remove = this.sendingSubscriptions.remove(Integer.valueOf(i));
        if (remove == null) {
            remove = this.sendingLimitableSubscriptions.get(Integer.valueOf(i));
        }
        if (remove != null) {
            remove.cancel();
        }
    }

    private void handleError(int i, Throwable th) {
        this.errorConsumer.accept(th);
        this.sendProcessor.onNext(Frame.Error.from(i, th));
    }

    private void handleRequestN(int i, Frame frame) {
        Subscription subscription = this.sendingSubscriptions.get(Integer.valueOf(i));
        if (subscription == null) {
            subscription = this.sendingLimitableSubscriptions.get(Integer.valueOf(i));
        }
        if (subscription != null) {
            int requestN = Frame.RequestN.requestN(frame);
            subscription.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
        }
    }
}
