package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestChannelFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.RequestStreamFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/core/RSocketResponder.class */
public class RSocketResponder implements RSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final ResponderRSocket responderRSocket;
    private final PayloadDecoder payloadDecoder;
    private final ResponderLeaseHandler leaseHandler;
    private final Disposable leaseHandlerDisposable;
    private volatile Throwable terminationError;
    private final int mtu;
    private final int maxFrameLength;
    private final IntObjectMap<Subscription> sendingSubscriptions;
    private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
    private final UnboundedProcessor<ByteBuf> sendProcessor;
    private final ByteBufAllocator allocator;
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);
    private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER = referenceCounted -> {
        if (referenceCounted.refCnt() > 0) {
            try {
                referenceCounted.release();
            } catch (IllegalReferenceCountException e) {
            }
        }
    };
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketResponder.class, Throwable.class, "terminationError");

    /* renamed from: io.rsocket.core.RSocketResponder$6, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/core/RSocketResponder$6.class */
    static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_FNF.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.CANCEL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_N.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.METADATA_PUSH.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.PAYLOAD.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.NEXT.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.COMPLETE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.ERROR.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.SETUP.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.LEASE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketResponder(DuplexConnection duplexConnection, RSocket rSocket, PayloadDecoder payloadDecoder, ResponderLeaseHandler responderLeaseHandler, int i, int i2) {
        this.connection = duplexConnection;
        this.allocator = duplexConnection.alloc();
        this.mtu = i;
        this.maxFrameLength = i2;
        this.requestHandler = rSocket;
        this.responderRSocket = rSocket instanceof ResponderRSocket ? (ResponderRSocket) rSocket : null;
        this.payloadDecoder = payloadDecoder;
        this.leaseHandler = responderLeaseHandler;
        this.sendingSubscriptions = new SynchronizedIntObjectHashMap();
        this.channelProcessors = new SynchronizedIntObjectHashMap();
        this.sendProcessor = new UnboundedProcessor<>();
        duplexConnection.send(this.sendProcessor).subscribe((Consumer) null, this::handleSendProcessorError);
        duplexConnection.receive().subscribe(this::handleFrame, th -> {
        });
        UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        this.leaseHandlerDisposable = responderLeaseHandler.send((v1) -> {
            r2.onNextPrioritized(v1);
        });
        this.connection.onClose().subscribe((Consumer) null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
    }

    private void handleSendProcessorError(Throwable th) {
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            } catch (Throwable th2) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Dropped exception", th);
                }
            }
        });
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onError(th);
            } catch (Throwable th2) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Dropped exception", th);
                }
            }
        });
    }

    private void tryTerminateOnConnectionError(Throwable th) {
        tryTerminate(() -> {
            return th;
        });
    }

    private void tryTerminateOnConnectionClose() {
        tryTerminate(() -> {
            return CLOSED_CHANNEL_EXCEPTION;
        });
    }

    private void tryTerminate(Supplier<Throwable> supplier) {
        if (this.terminationError == null) {
            Throwable th = supplier.get();
            if (TERMINATION_ERROR.compareAndSet(this, null, th)) {
                cleanup(th);
            }
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.fireAndForget(payload);
            }
            payload.release();
            return Mono.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestResponse(payload);
            }
            payload.release();
            return Mono.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.requestHandler.requestStream(payload);
            }
            payload.release();
            return Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.leaseHandler.useLease() ? this.requestHandler.requestChannel(publisher) : Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    private Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
        try {
            if (this.leaseHandler.useLease()) {
                return this.responderRSocket.requestChannel(payload, publisher);
            }
            payload.release();
            return Flux.error(this.leaseHandler.leaseError());
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public void dispose() {
        tryTerminate(() -> {
            return new CancellationException("Disposed");
        });
    }

    @Override // io.rsocket.RSocket
    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override // io.rsocket.RSocket, io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup(Throwable th) {
        cleanUpSendingSubscriptions();
        cleanUpChannelProcessors(th);
        this.connection.dispose();
        this.leaseHandlerDisposable.dispose();
        this.requestHandler.dispose();
        this.sendProcessor.dispose();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors(Throwable th) {
        this.channelProcessors.values().forEach(processor -> {
            try {
                processor.onError(th);
            } catch (Throwable th2) {
            }
        });
        this.channelProcessors.clear();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:3:0x0014. Please report as an issue. */
    private void handleFrame(ByteBuf byteBuf) {
        try {
            int streamId = FrameHeaderCodec.streamId(byteBuf);
            FrameType frameType = FrameHeaderCodec.frameType(byteBuf);
            switch (AnonymousClass6.$SwitchMap$io$rsocket$frame$FrameType[frameType.ordinal()]) {
                case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                    handleFireAndForget(streamId, fireAndForget(this.payloadDecoder.apply(byteBuf)));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                    handleRequestResponse(streamId, requestResponse(this.payloadDecoder.apply(byteBuf)));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 3:
                    handleCancelFrame(streamId);
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case ErrorFrameCodec.REJECTED_RESUME /* 4 */:
                    handleRequestN(streamId, byteBuf);
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 5:
                    handleStream(streamId, requestStream(this.payloadDecoder.apply(byteBuf)), RequestStreamFrameCodec.initialRequestN(byteBuf), null);
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 6:
                    handleChannel(streamId, this.payloadDecoder.apply(byteBuf), RequestChannelFrameCodec.initialRequestN(byteBuf));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 7:
                    handleMetadataPush(metadataPush(this.payloadDecoder.apply(byteBuf)));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case SynchronizedIntObjectHashMap.DEFAULT_CAPACITY /* 8 */:
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 9:
                    Subscriber subscriber = (Subscriber) this.channelProcessors.get(streamId);
                    if (subscriber != null) {
                        subscriber.onNext(this.payloadDecoder.apply(byteBuf));
                    }
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 10:
                    Subscriber subscriber2 = (Subscriber) this.channelProcessors.get(streamId);
                    if (subscriber2 != null) {
                        subscriber2.onComplete();
                    }
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 11:
                    Subscriber subscriber3 = (Subscriber) this.channelProcessors.get(streamId);
                    if (subscriber3 != null) {
                        try {
                            subscriber3.onError(Exceptions.from(streamId, byteBuf));
                        } catch (RuntimeException e) {
                            if ((reactor.core.Exceptions.isBubbling(e) || reactor.core.Exceptions.isErrorCallbackNotImplemented(e)) && LOGGER.isDebugEnabled()) {
                                LOGGER.debug("Unhandled dropped exception", reactor.core.Exceptions.unwrap(e));
                            }
                        }
                    }
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 12:
                    Subscriber subscriber4 = (Subscriber) this.channelProcessors.get(streamId);
                    if (subscriber4 != null) {
                        subscriber4.onNext(this.payloadDecoder.apply(byteBuf));
                        subscriber4.onComplete();
                    }
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 13:
                    handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
                case 14:
                default:
                    handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + frameType));
                    ReferenceCountUtil.safeRelease(byteBuf);
                    return;
            }
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(byteBuf);
            throw reactor.core.Exceptions.propagate(th);
        }
    }

    private void handleFireAndForget(final int i, Mono<Void> mono) {
        mono.subscribe(new BaseSubscriber<Void>() { // from class: io.rsocket.core.RSocketResponder.1
            protected void hookOnSubscribe(Subscription subscription) {
                RSocketResponder.this.sendingSubscriptions.put(i, subscription);
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnError(Throwable th) {
            }

            protected void hookFinally(SignalType signalType) {
                RSocketResponder.this.sendingSubscriptions.remove(i);
            }
        });
    }

    private void handleRequestResponse(final int i, Mono<Payload> mono) {
        BaseSubscriber<Payload> baseSubscriber = new BaseSubscriber<Payload>() { // from class: io.rsocket.core.RSocketResponder.2
            private boolean isEmpty = true;

            /* JADX INFO: Access modifiers changed from: protected */
            public void hookOnNext(Payload payload) {
                if (this.isEmpty) {
                    this.isEmpty = false;
                }
                if (PayloadValidationUtils.isValid(RSocketResponder.this.mtu, payload, RSocketResponder.this.maxFrameLength)) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeNextCompleteReleasingPayload(RSocketResponder.this.allocator, i, payload));
                } else {
                    payload.release();
                    cancel();
                    RSocketResponder.this.handleError(i, new IllegalArgumentException("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
                }
            }

            protected void hookOnError(Throwable th) {
                if (RSocketResponder.this.sendingSubscriptions.remove(Integer.valueOf(i), this)) {
                    RSocketResponder.this.handleError(i, th);
                }
            }

            protected void hookOnComplete() {
                if (this.isEmpty && RSocketResponder.this.sendingSubscriptions.remove(Integer.valueOf(i), this)) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeComplete(RSocketResponder.this.allocator, i));
                }
            }
        };
        this.sendingSubscriptions.put(i, baseSubscriber);
        mono.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(baseSubscriber);
    }

    private void handleStream(final int i, Flux<Payload> flux, final long j, @Nullable final UnicastProcessor<Payload> unicastProcessor) {
        BaseSubscriber<Payload> baseSubscriber = new BaseSubscriber<Payload>() { // from class: io.rsocket.core.RSocketResponder.3
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(j);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public void hookOnNext(Payload payload) {
                try {
                    if (PayloadValidationUtils.isValid(RSocketResponder.this.mtu, payload, RSocketResponder.this.maxFrameLength)) {
                        RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeNextReleasingPayload(RSocketResponder.this.allocator, i, payload));
                    } else {
                        payload.release();
                        cancelStream(new IllegalArgumentException("The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory."));
                    }
                } catch (Throwable th) {
                    cancelStream(th);
                }
            }

            private void cancelStream(Throwable th) {
                if (unicastProcessor != null) {
                    RSocketResponder.this.channelProcessors.remove(Integer.valueOf(i), unicastProcessor);
                }
                cancel();
                RSocketResponder.this.handleError(i, th);
            }

            protected void hookOnComplete() {
                if (RSocketResponder.this.sendingSubscriptions.remove(Integer.valueOf(i), this)) {
                    RSocketResponder.this.sendProcessor.onNext(PayloadFrameCodec.encodeComplete(RSocketResponder.this.allocator, i));
                }
            }

            protected void hookOnError(Throwable th) {
                if (RSocketResponder.this.sendingSubscriptions.remove(Integer.valueOf(i), this)) {
                    if (unicastProcessor != null && !unicastProcessor.isDisposed() && RSocketResponder.this.channelProcessors.remove(Integer.valueOf(i), unicastProcessor)) {
                        try {
                            unicastProcessor.dispose();
                        } catch (Throwable th2) {
                        }
                    }
                    RSocketResponder.this.handleError(i, th);
                }
            }
        };
        this.sendingSubscriptions.put(i, baseSubscriber);
        flux.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(baseSubscriber);
    }

    private void handleChannel(final int i, Payload payload, long j) {
        UnicastProcessor<Payload> create = UnicastProcessor.create();
        this.channelProcessors.put(i, create);
        Flux doOnDiscard = create.doOnRequest(new LongConsumer() { // from class: io.rsocket.core.RSocketResponder.4
            boolean first = true;

            @Override // java.util.function.LongConsumer
            public void accept(long j2) {
                long j3;
                if (this.first) {
                    this.first = false;
                    j3 = j2 - 1;
                } else {
                    j3 = j2;
                }
                if (j3 > 0) {
                    RSocketResponder.this.sendProcessor.onNext(RequestNFrameCodec.encode(RSocketResponder.this.allocator, i, j3));
                }
            }
        }).doFinally(signalType -> {
            Subscription subscription;
            if (this.channelProcessors.remove(Integer.valueOf(i), create)) {
                if (signalType == SignalType.CANCEL) {
                    this.sendProcessor.onNext(CancelFrameCodec.encode(this.allocator, i));
                } else {
                    if (signalType != SignalType.ON_ERROR || (subscription = (Subscription) this.sendingSubscriptions.remove(i)) == null) {
                        return;
                    }
                    subscription.cancel();
                }
            }
        }).doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
        create.onNext(payload);
        if (this.responderRSocket != null) {
            handleStream(i, requestChannel(payload, doOnDiscard), j, create);
        } else {
            handleStream(i, requestChannel(doOnDiscard), j, create);
        }
    }

    private void handleMetadataPush(Mono<Void> mono) {
        mono.subscribe(new BaseSubscriber<Void>() { // from class: io.rsocket.core.RSocketResponder.5
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnError(Throwable th) {
            }
        });
    }

    private void handleCancelFrame(int i) {
        Subscription subscription = (Subscription) this.sendingSubscriptions.remove(i);
        Processor processor = (Processor) this.channelProcessors.remove(i);
        if (processor != null) {
            try {
                processor.onError(new CancellationException("Disposed"));
            } catch (Exception e) {
            }
        }
        if (subscription != null) {
            subscription.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(int i, Throwable th) {
        this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, i, th));
    }

    private void handleRequestN(int i, ByteBuf byteBuf) {
        Subscription subscription = (Subscription) this.sendingSubscriptions.get(i);
        if (subscription != null) {
            subscription.request(RequestNFrameCodec.requestN(byteBuf));
        }
    }
}
