package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.internal.UnboundedProcessor;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection.class */
public class ResumableDuplexConnection extends Flux<ByteBuf> implements DuplexConnection, Subscription {
    final String tag;
    final ResumableFramesStore resumableFramesStore;
    final Disposable framesSaverDisposable;
    final SocketAddress remoteAddress;
    CoreSubscriber<? super ByteBuf> receiveSubscriber;
    FrameReceivingSubscriber activeReceivingSubscriber;
    volatile int state;
    volatile DuplexConnection activeConnection;
    static final Logger logger = LoggerFactory.getLogger((Class<?>) ResumableDuplexConnection.class);
    static final AtomicIntegerFieldUpdater<ResumableDuplexConnection> STATE = AtomicIntegerFieldUpdater.newUpdater(ResumableDuplexConnection.class, "state");
    static final AtomicReferenceFieldUpdater<ResumableDuplexConnection, DuplexConnection> ACTIVE_CONNECTION = AtomicReferenceFieldUpdater.newUpdater(ResumableDuplexConnection.class, DuplexConnection.class, "activeConnection");
    int connectionIndex = 0;
    final Sinks.Many<Integer> onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer();
    final UnboundedProcessor savableFramesSender = new UnboundedProcessor();
    final MonoProcessor<Void> onClose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$DisposedConnection.class */
    public static final class DisposedConnection implements DuplexConnection {
        static final DisposedConnection INSTANCE = new DisposedConnection();

        private DisposedConnection() {
        }

        @Override // reactor.core.Disposable
        public void dispose() {
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return Mono.never();
        }

        @Override // io.rsocket.DuplexConnection
        public void sendFrame(int i, ByteBuf byteBuf) {
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<ByteBuf> receive() {
            return Flux.never();
        }

        @Override // io.rsocket.DuplexConnection
        public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        }

        @Override // io.rsocket.DuplexConnection
        public ByteBufAllocator alloc() {
            return ByteBufAllocator.DEFAULT;
        }

        @Override // io.rsocket.DuplexConnection
        public SocketAddress remoteAddress() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$FrameReceivingSubscriber.class */
    public static final class FrameReceivingSubscriber implements CoreSubscriber<ByteBuf>, Disposable {
        final ResumableFramesStore resumableFramesStore;
        final CoreSubscriber<? super ByteBuf> actual;
        final String tag;
        volatile Subscription s;
        static final AtomicReferenceFieldUpdater<FrameReceivingSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(FrameReceivingSubscriber.class, Subscription.class, "s");
        boolean cancelled;

        private FrameReceivingSubscriber(String str, ResumableFramesStore resumableFramesStore, CoreSubscriber<? super ByteBuf> coreSubscriber) {
            this.tag = str;
            this.resumableFramesStore = resumableFramesStore;
            this.actual = coreSubscriber;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.setOnce(S, this, subscription)) {
                subscription.request(Long.MAX_VALUE);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuf byteBuf) {
            if (this.cancelled || this.s == Operators.cancelledSubscription()) {
                return;
            }
            if (!ResumableDuplexConnection.isResumableFrame(byteBuf)) {
                this.actual.onNext(byteBuf);
            } else if (this.resumableFramesStore.resumableFrameReceived(byteBuf)) {
                this.actual.onNext(byteBuf);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            Operators.set(S, this, Operators.cancelledSubscription());
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            Operators.set(S, this, Operators.cancelledSubscription());
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            this.cancelled = true;
            Operators.terminate(S, this);
        }

        @Override // reactor.core.Disposable
        public boolean isDisposed() {
            return this.cancelled || this.s == Operators.cancelledSubscription();
        }
    }

    public ResumableDuplexConnection(String str, DuplexConnection duplexConnection, ResumableFramesStore resumableFramesStore) {
        this.tag = str;
        this.resumableFramesStore = resumableFramesStore;
        this.framesSaverDisposable = resumableFramesStore.saveFrames(this.savableFramesSender).subscribe();
        this.remoteAddress = duplexConnection.remoteAddress();
        ACTIVE_CONNECTION.lazySet(this, duplexConnection);
    }

    public boolean connect(DuplexConnection duplexConnection) {
        DuplexConnection duplexConnection2 = this.activeConnection;
        if (duplexConnection2 == DisposedConnection.INSTANCE || !ACTIVE_CONNECTION.compareAndSet(this, duplexConnection2, duplexConnection)) {
            return false;
        }
        duplexConnection2.dispose();
        initConnection(duplexConnection);
        return true;
    }

    void initConnection(DuplexConnection duplexConnection) {
        logger.debug("Tag {}. Initializing connection {}", this.tag, duplexConnection);
        int i = this.connectionIndex;
        FrameReceivingSubscriber frameReceivingSubscriber = new FrameReceivingSubscriber(this.tag, this.resumableFramesStore, this.receiveSubscriber);
        this.connectionIndex = i + 1;
        this.activeReceivingSubscriber = frameReceivingSubscriber;
        Disposable subscribe = this.resumableFramesStore.resumeStream().subscribe(byteBuf -> {
            duplexConnection.sendFrame(FrameHeaderCodec.streamId(byteBuf), byteBuf);
        });
        duplexConnection.receive().subscribe((CoreSubscriber<? super ByteBuf>) frameReceivingSubscriber);
        duplexConnection.onClose().doFinally(signalType -> {
            frameReceivingSubscriber.dispose();
            subscribe.dispose();
            Sinks.EmitResult tryEmitNext = this.onConnectionClosedSink.tryEmitNext(Integer.valueOf(i));
            if (tryEmitNext.equals(Sinks.EmitResult.OK)) {
                return;
            }
            logger.error("Failed to notify session of closed connection: {}", tryEmitNext);
        }).subscribe();
    }

    public void disconnect() {
        DuplexConnection duplexConnection = this.activeConnection;
        if (duplexConnection != DisposedConnection.INSTANCE) {
            duplexConnection.dispose();
        }
    }

    @Override // io.rsocket.DuplexConnection
    public void sendFrame(int i, ByteBuf byteBuf) {
        if (i == 0) {
            this.savableFramesSender.onNextPrioritized(byteBuf);
        } else {
            this.savableFramesSender.onNext(byteBuf);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<Integer> onActiveConnectionClosed() {
        return this.onConnectionClosedSink.asFlux();
    }

    @Override // io.rsocket.DuplexConnection
    public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        DuplexConnection andSet = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
        if (andSet == DisposedConnection.INSTANCE) {
            return;
        }
        andSet.sendErrorAndClose(rSocketErrorException);
        andSet.onClose().subscribe(null, th -> {
            this.framesSaverDisposable.dispose();
            this.savableFramesSender.dispose();
            this.onConnectionClosedSink.tryEmitComplete();
            this.onClose.onError(th);
        }, () -> {
            this.framesSaverDisposable.dispose();
            this.savableFramesSender.dispose();
            this.onConnectionClosedSink.tryEmitComplete();
            Throwable cause = rSocketErrorException.getCause();
            if (cause == null) {
                this.onClose.onComplete();
            } else {
                this.onClose.onError(cause);
            }
        });
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this;
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.activeConnection.alloc();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.onClose;
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        DuplexConnection andSet = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
        if (andSet == DisposedConnection.INSTANCE) {
            return;
        }
        if (andSet != null) {
            andSet.dispose();
        }
        this.framesSaverDisposable.dispose();
        this.activeReceivingSubscriber.dispose();
        this.savableFramesSender.dispose();
        this.onConnectionClosedSink.tryEmitComplete();
        this.onClose.onComplete();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    @Override // io.rsocket.DuplexConnection
    public SocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        if (this.state == 1 && STATE.compareAndSet(this, 1, 2)) {
            initConnection(this.activeConnection);
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        dispose();
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        if (this.state == 0 && STATE.compareAndSet(this, 0, 1)) {
            this.receiveSubscriber = coreSubscriber;
            coreSubscriber.onSubscribe(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isResumableFrame(ByteBuf byteBuf) {
        return FrameHeaderCodec.streamId(byteBuf) != 0;
    }
}
