package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.exceptions.CanceledException;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.UnboundedProcessor;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* loaded from: input_file:io/rsocket/core/RequestChannelResponderSubscriber.class */
final class RequestChannelResponderSubscriber extends Flux<Payload> implements ResponderFrameHandler, Subscription, CoreSubscriber<Payload> {
    final int streamId;
    final ByteBufAllocator allocator;
    final PayloadDecoder payloadDecoder;
    final int mtu;
    final int maxFrameLength;
    final int maxInboundPayloadSize;
    final RequesterResponderSupport requesterResponderSupport;
    final UnboundedProcessor<ByteBuf> sendProcessor;
    final long firstRequest;
    final RSocket handler;
    volatile long state;
    Payload firstPayload;
    Subscription outboundSubscription;
    CoreSubscriber<? super Payload> inboundSubscriber;
    CompositeByteBuf frames;
    volatile Throwable inboundError;
    boolean inboundDone;
    boolean outboundDone;
    static final Logger logger = LoggerFactory.getLogger(RequestChannelResponderSubscriber.class);
    static final AtomicLongFieldUpdater<RequestChannelResponderSubscriber> STATE = AtomicLongFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, "state");
    static final AtomicReferenceFieldUpdater<RequestChannelResponderSubscriber, Throwable> INBOUND_ERROR = AtomicReferenceFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, Throwable.class, "inboundError");

    public RequestChannelResponderSubscriber(int i, long j, ByteBuf byteBuf, RequesterResponderSupport requesterResponderSupport, RSocket rSocket) {
        this.streamId = i;
        this.allocator = requesterResponderSupport.getAllocator();
        this.mtu = requesterResponderSupport.getMtu();
        this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
        this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
        this.requesterResponderSupport = requesterResponderSupport;
        this.sendProcessor = requesterResponderSupport.getSendProcessor();
        this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
        this.handler = rSocket;
        this.firstRequest = j;
        this.frames = ReassemblyUtils.addFollowingFrame(this.allocator.compositeBuffer(), byteBuf, true, this.maxInboundPayloadSize);
        STATE.lazySet(this, 8589934592L);
    }

    public RequestChannelResponderSubscriber(int i, long j, Payload payload, RequesterResponderSupport requesterResponderSupport) {
        this.streamId = i;
        this.allocator = requesterResponderSupport.getAllocator();
        this.mtu = requesterResponderSupport.getMtu();
        this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
        this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
        this.requesterResponderSupport = requesterResponderSupport;
        this.sendProcessor = requesterResponderSupport.getSendProcessor();
        this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
        this.firstRequest = j;
        this.firstPayload = payload;
        this.handler = null;
        this.frames = null;
    }

    public void subscribe(CoreSubscriber<? super Payload> coreSubscriber) {
        long markSubscribed = StateUtils.markSubscribed(STATE, this);
        if (StateUtils.isTerminated(markSubscribed)) {
            Throwable terminate = Exceptions.terminate(INBOUND_ERROR, this);
            if (terminate != Exceptions.TERMINATED) {
                Operators.error(coreSubscriber, terminate);
                return;
            } else {
                Operators.error(coreSubscriber, new CancellationException("RequestChannelSubscriber has already been terminated"));
                return;
            }
        }
        if (StateUtils.isSubscribed(markSubscribed)) {
            Operators.error(coreSubscriber, new IllegalStateException("RequestChannelSubscriber allows only one Subscriber"));
        } else {
            this.inboundSubscriber = coreSubscriber;
            coreSubscriber.onSubscribe(this);
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (Operators.validate(this.outboundSubscription, subscription)) {
            this.outboundSubscription = subscription;
            subscription.request(this.firstRequest);
        }
    }

    public void request(long j) {
        if (Operators.validate(j)) {
            long addRequestN = StateUtils.addRequestN(STATE, this, j);
            if (StateUtils.isTerminated(addRequestN)) {
                Throwable terminate = Exceptions.terminate(INBOUND_ERROR, this);
                if (terminate == Exceptions.TERMINATED) {
                    return;
                }
                if (terminate != null || this.inboundDone) {
                    CoreSubscriber<? super Payload> coreSubscriber = this.inboundSubscriber;
                    Payload payload = this.firstPayload;
                    if (payload != null) {
                        this.firstPayload = null;
                        coreSubscriber.onNext(payload);
                    }
                    if (terminate != null) {
                        coreSubscriber.onError(terminate);
                        return;
                    } else {
                        coreSubscriber.onComplete();
                        return;
                    }
                }
                return;
            }
            if (StateUtils.isInboundTerminated(addRequestN)) {
                if (StateUtils.hasRequested(addRequestN) || StateUtils.isFirstFrameSent(addRequestN) || !this.inboundDone) {
                    return;
                }
                CoreSubscriber<? super Payload> coreSubscriber2 = this.inboundSubscriber;
                Payload payload2 = this.firstPayload;
                this.firstPayload = null;
                coreSubscriber2.onNext(payload2);
                coreSubscriber2.onComplete();
                StateUtils.markFirstFrameSent(STATE, this);
                return;
            }
            if (StateUtils.hasRequested(addRequestN)) {
                if (!StateUtils.isFirstFrameSent(addRequestN) || StateUtils.isMaxAllowedRequestN(StateUtils.extractRequestN(addRequestN))) {
                    return;
                }
                this.sendProcessor.onNext(RequestNFrameCodec.encode(this.allocator, this.streamId, j));
                return;
            }
            CoreSubscriber<? super Payload> coreSubscriber3 = this.inboundSubscriber;
            Payload payload3 = this.firstPayload;
            this.firstPayload = null;
            coreSubscriber3.onNext(payload3);
            long markFirstFrameSent = StateUtils.markFirstFrameSent(STATE, this);
            if (StateUtils.isTerminated(markFirstFrameSent)) {
                Throwable terminate2 = Exceptions.terminate(INBOUND_ERROR, this);
                if (terminate2 == Exceptions.TERMINATED) {
                    return;
                }
                if (terminate2 != null) {
                    coreSubscriber3.onError(terminate2);
                    return;
                } else {
                    if (this.inboundDone) {
                        coreSubscriber3.onComplete();
                        return;
                    }
                    return;
                }
            }
            if (StateUtils.isInboundTerminated(markFirstFrameSent)) {
                if (this.inboundDone) {
                    coreSubscriber3.onComplete();
                    return;
                }
                return;
            }
            long extractRequestN = StateUtils.extractRequestN(markFirstFrameSent);
            if (StateUtils.isMaxAllowedRequestN(extractRequestN)) {
                this.sendProcessor.onNext(RequestNFrameCodec.encode(this.allocator, this.streamId, extractRequestN));
                return;
            }
            long j2 = extractRequestN - 1;
            if (j2 > 0) {
                this.sendProcessor.onNext(RequestNFrameCodec.encode(this.allocator, this.streamId, j2));
            }
        }
    }

    public void cancel() {
        long markInboundTerminated = StateUtils.markInboundTerminated(STATE, this);
        if (StateUtils.isTerminated(markInboundTerminated) || StateUtils.isInboundTerminated(markInboundTerminated)) {
            return;
        }
        if (!StateUtils.isFirstFrameSent(markInboundTerminated) && !StateUtils.hasRequested(markInboundTerminated)) {
            Payload payload = this.firstPayload;
            this.firstPayload = null;
            payload.release();
        }
        int i = this.streamId;
        if (StateUtils.isOutboundTerminated(markInboundTerminated)) {
            this.requesterResponderSupport.remove(i, this);
        }
        this.sendProcessor.onNext(CancelFrameCodec.encode(this.allocator, i));
    }

    @Override // io.rsocket.core.FrameHandler
    public final void handleCancel() {
        if (this.outboundSubscription != null) {
            tryTerminate(true);
            return;
        }
        StateUtils.lazyTerminate(STATE, this);
        this.requesterResponderSupport.remove(this.streamId, this);
        CompositeByteBuf compositeByteBuf = this.frames;
        if (compositeByteBuf != null) {
            this.frames = null;
            compositeByteBuf.release();
        } else {
            Payload payload = this.firstPayload;
            this.firstPayload = null;
            payload.release();
        }
    }

    final long tryTerminate(boolean z) {
        Throwable terminate;
        Exceptions.addThrowable(INBOUND_ERROR, this, new CancellationException("Inbound has been canceled"));
        long markTerminated = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(markTerminated)) {
            return markTerminated;
        }
        this.requesterResponderSupport.remove(this.streamId, this);
        if (StateUtils.isReassembling(markTerminated)) {
            CompositeByteBuf compositeByteBuf = this.frames;
            this.frames = null;
            if (z) {
                compositeByteBuf.release();
            } else {
                synchronized (compositeByteBuf) {
                    compositeByteBuf.release();
                }
            }
        }
        Subscription subscription = this.outboundSubscription;
        if (subscription == null) {
            return markTerminated;
        }
        subscription.cancel();
        if (!StateUtils.isSubscribed(markTerminated)) {
            Payload payload = this.firstPayload;
            this.firstPayload = null;
            payload.release();
        } else if (StateUtils.isFirstFrameSent(markTerminated) && !StateUtils.isInboundTerminated(markTerminated) && (terminate = Exceptions.terminate(INBOUND_ERROR, this)) != Exceptions.TERMINATED) {
            if (z) {
                this.inboundDone = true;
                this.inboundSubscriber.onError(terminate);
            } else {
                synchronized (this) {
                    this.inboundDone = true;
                    this.inboundSubscriber.onError(terminate);
                }
            }
        }
        return markTerminated;
    }

    final void handlePayload(Payload payload) {
        synchronized (this) {
            if (this.inboundDone) {
                payload.release();
            } else {
                this.inboundSubscriber.onNext(payload);
            }
        }
    }

    @Override // io.rsocket.core.ResponderFrameHandler, io.rsocket.core.FrameHandler
    public final void handleError(Throwable th) {
        Throwable terminate;
        if (this.inboundDone) {
            Operators.onErrorDropped(th, this.inboundSubscriber.currentContext());
            return;
        }
        this.inboundDone = true;
        boolean addThrowable = Exceptions.addThrowable(INBOUND_ERROR, this, th);
        long markTerminated = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(markTerminated)) {
            if (addThrowable) {
                return;
            }
            Operators.onErrorDropped(th, this.inboundSubscriber.currentContext());
            return;
        }
        this.requesterResponderSupport.remove(this.streamId, this);
        if (StateUtils.isReassembling(markTerminated)) {
            CompositeByteBuf compositeByteBuf = this.frames;
            this.frames = null;
            compositeByteBuf.release();
        }
        if (!StateUtils.isSubscribed(markTerminated)) {
            Payload payload = this.firstPayload;
            this.firstPayload = null;
            payload.release();
        } else if (StateUtils.isFirstFrameSent(markTerminated) && !StateUtils.isInboundTerminated(markTerminated) && (terminate = Exceptions.terminate(INBOUND_ERROR, this)) != Exceptions.TERMINATED) {
            this.inboundSubscriber.onError(terminate);
        }
        this.outboundSubscription.cancel();
    }

    @Override // io.rsocket.core.ResponderFrameHandler, io.rsocket.core.FrameHandler
    public void handleComplete() {
        if (this.inboundDone) {
            return;
        }
        this.inboundDone = true;
        long markInboundTerminated = StateUtils.markInboundTerminated(STATE, this);
        if (StateUtils.isOutboundTerminated(markInboundTerminated)) {
            this.requesterResponderSupport.remove(this.streamId, this);
        }
        if (StateUtils.isFirstFrameSent(markInboundTerminated)) {
            this.inboundSubscriber.onComplete();
        }
    }

    @Override // io.rsocket.core.FrameHandler
    public void handleNext(ByteBuf byteBuf, boolean z, boolean z2) {
        CompositeByteBuf addFollowingFrame;
        long j = this.state;
        if (StateUtils.isTerminated(j)) {
            return;
        }
        if (!z && !StateUtils.isReassembling(j)) {
            try {
                handlePayload(this.payloadDecoder.apply(byteBuf));
                if (z2) {
                    handleComplete();
                    return;
                }
                return;
            } catch (Throwable th) {
                long tryTerminate = tryTerminate(true);
                if (StateUtils.isTerminated(tryTerminate) || StateUtils.isOutboundTerminated(tryTerminate)) {
                    Operators.onErrorDropped(th, this.inboundSubscriber.currentContext());
                    return;
                }
                this.outboundDone = true;
                this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, this.streamId, new CanceledException(th.getMessage())));
                return;
            }
        }
        CompositeByteBuf compositeByteBuf = this.frames;
        if (compositeByteBuf == null) {
            addFollowingFrame = ReassemblyUtils.addFollowingFrame(this.allocator.compositeBuffer(), byteBuf, z, this.maxInboundPayloadSize);
            this.frames = addFollowingFrame;
            if (StateUtils.isTerminated(StateUtils.markReassembling(STATE, this))) {
                this.frames = null;
                addFollowingFrame.release();
                return;
            }
        } else {
            try {
                addFollowingFrame = ReassemblyUtils.addFollowingFrame(compositeByteBuf, byteBuf, z, this.maxInboundPayloadSize);
            } catch (IllegalStateException e) {
                if (StateUtils.isTerminated(this.state)) {
                    return;
                }
                long tryTerminate2 = tryTerminate(true);
                if (StateUtils.isTerminated(tryTerminate2) || StateUtils.isOutboundTerminated(tryTerminate2)) {
                    Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
                    return;
                }
                this.outboundDone = true;
                this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, this.streamId, new CanceledException("Failed to reassemble payload. Cause: " + e.getMessage())));
                return;
            }
        }
        if (z || StateUtils.isTerminated(StateUtils.markReassembled(STATE, this))) {
            return;
        }
        this.frames = null;
        try {
            Payload apply = this.payloadDecoder.apply(addFollowingFrame);
            addFollowingFrame.release();
            if (this.outboundSubscription == null) {
                this.firstPayload = apply;
                this.handler.requestChannel(this).subscribe(this);
            } else {
                handlePayload(apply);
            }
            if (z2) {
                handleComplete();
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.safeRelease(addFollowingFrame);
            long tryTerminate3 = tryTerminate(true);
            if (StateUtils.isTerminated(tryTerminate3) || StateUtils.isOutboundTerminated(tryTerminate3)) {
                Operators.onErrorDropped(th2, this.inboundSubscriber.currentContext());
            } else {
                this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, this.streamId, new CanceledException("Failed to reassemble payload. Cause: " + th2.getMessage())));
            }
        }
    }

    public void onNext(Payload payload) {
        if (this.outboundDone) {
            ReferenceCountUtil.safeRelease(payload);
            return;
        }
        int i = this.streamId;
        UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
        ByteBufAllocator byteBufAllocator = this.allocator;
        if (payload == null) {
            unboundedProcessor.onNext(PayloadFrameCodec.encodeComplete(byteBufAllocator, i));
            return;
        }
        int i2 = this.mtu;
        try {
            if (PayloadValidationUtils.isValid(i2, this.maxFrameLength, payload, false)) {
                try {
                    SendUtils.sendReleasingPayload(i, FrameType.NEXT, i2, payload, unboundedProcessor, byteBufAllocator, false);
                    return;
                } catch (Throwable th) {
                    tryTerminate(false);
                    return;
                }
            }
            payload.release();
            long tryTerminate = tryTerminate(false);
            if (StateUtils.isTerminated(tryTerminate) || StateUtils.isOutboundTerminated(tryTerminate)) {
                Operators.onErrorDropped(new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", Integer.valueOf(this.maxFrameLength))), this.inboundSubscriber.currentContext());
            } else {
                unboundedProcessor.onNext(ErrorFrameCodec.encode(byteBufAllocator, i, new CanceledException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", Integer.valueOf(this.maxFrameLength)))));
            }
        } catch (IllegalReferenceCountException e) {
            long tryTerminate2 = tryTerminate(false);
            if (StateUtils.isTerminated(tryTerminate2) || StateUtils.isOutboundTerminated(tryTerminate2)) {
                Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
            } else {
                unboundedProcessor.onNext(ErrorFrameCodec.encode(byteBufAllocator, i, new CanceledException("Failed to validate payload. Cause:" + e.getMessage())));
            }
        }
    }

    public void onError(Throwable th) {
        Throwable terminate;
        if (this.outboundDone) {
            Operators.onErrorDropped(th, this.inboundSubscriber.currentContext());
            return;
        }
        boolean addThrowable = Exceptions.addThrowable(INBOUND_ERROR, this, new CancellationException("Outbound has terminated with an error"));
        this.outboundDone = true;
        long markTerminated = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(markTerminated)) {
            Operators.onErrorDropped(th, this.inboundSubscriber.currentContext());
            return;
        }
        int i = this.streamId;
        this.requesterResponderSupport.remove(i, this);
        if (StateUtils.isReassembling(markTerminated)) {
            CompositeByteBuf compositeByteBuf = this.frames;
            this.frames = null;
            synchronized (compositeByteBuf) {
                compositeByteBuf.release();
            }
        }
        if (!StateUtils.isFirstFrameSent(markTerminated) && !StateUtils.hasRequested(markTerminated)) {
            Payload payload = this.firstPayload;
            this.firstPayload = null;
            payload.release();
        }
        if (addThrowable && !StateUtils.isInboundTerminated(markTerminated) && (terminate = Exceptions.terminate(INBOUND_ERROR, this)) != Exceptions.TERMINATED) {
            synchronized (this) {
                this.inboundDone = true;
                this.inboundSubscriber.onError(terminate);
            }
        }
        this.sendProcessor.onNext(ErrorFrameCodec.encode(this.allocator, i, th));
    }

    public void onComplete() {
        if (this.outboundDone) {
            return;
        }
        this.outboundDone = true;
        long markOutboundTerminated = StateUtils.markOutboundTerminated(STATE, this, false);
        if (StateUtils.isTerminated(markOutboundTerminated)) {
            return;
        }
        int i = this.streamId;
        if (StateUtils.isInboundTerminated(markOutboundTerminated)) {
            this.requesterResponderSupport.remove(i, this);
        }
        this.sendProcessor.onNext(PayloadFrameCodec.encodeComplete(this.allocator, i));
    }

    @Override // io.rsocket.core.ResponderFrameHandler, io.rsocket.core.FrameHandler
    public final void handleRequestN(long j) {
        this.outboundSubscription.request(j);
    }

    public Context currentContext() {
        return SendUtils.DISCARD_CONTEXT;
    }
}
