package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection.class */
public class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
    private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
    private static final Throwable closedChannelException = new ClosedChannelException();
    private final String tag;
    private final ResumableFramesStore resumableFramesStore;
    private final Duration resumeStreamTimeout;
    private final boolean cleanupOnKeepAlive;
    private volatile DuplexConnection curConnection;
    private final Mono<Void> framesSent;
    private volatile Runnable onResume;
    private volatile Runnable onDisconnect;
    private volatile int state;
    private final ReplayProcessor<DuplexConnection> connections = ReplayProcessor.create(1);
    private final EmitterProcessor<Throwable> connectionErrors = EmitterProcessor.create();
    private final FluxProcessor<ByteBuf, ByteBuf> downStreamFrames = ReplayProcessor.create(0);
    private final FluxProcessor<ByteBuf, ByteBuf> resumeSaveFrames = EmitterProcessor.create();
    private final MonoProcessor<Void> resumeSaveCompleted = MonoProcessor.create();
    private final Queue<Object> actions = (Queue) Queues.unboundedMultiproducer().get();
    private final AtomicInteger actionsWip = new AtomicInteger();
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final RequestListener downStreamRequestListener = new RequestListener();
    private final RequestListener resumeSaveStreamRequestListener = new RequestListener();
    private final UnicastProcessor<Flux<ByteBuf>> upstreams = UnicastProcessor.create();
    private final UpstreamFramesSubscriber upstreamSubscriber = new UpstreamFramesSubscriber(Queues.SMALL_BUFFER_SIZE, this.downStreamRequestListener.requests(), this.resumeSaveStreamRequestListener.requests(), (v1) -> {
        dispatch(v1);
    });
    private volatile Disposable resumedStreamDisposable = Disposables.disposed();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.rsocket.resume.ResumableDuplexConnection$1, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_FNF.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.REQUEST_N.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.CANCEL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.ERROR.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.PAYLOAD.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$ReleaseFrames.class */
    private class ReleaseFrames implements Runnable {
        private final long remoteImpliedPos;

        public ReleaseFrames(long j) {
            this.remoteImpliedPos = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            ResumableDuplexConnection.this.releaseFramesToPosition(this.remoteImpliedPos);
        }
    }

    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$Resume.class */
    class Resume implements Runnable {
        private final long remotePos;
        private final long remoteImpliedPos;
        private final Function<Mono<Long>, Mono<Void>> resumeFrameSent;

        public Resume(long j, long j2, Function<Mono<Long>, Mono<Void>> function) {
            this.remotePos = j;
            this.remoteImpliedPos = j2;
            this.resumeFrameSent = function;
        }

        @Override // java.lang.Runnable
        public void run() {
            ResumableDuplexConnection.this.doResume(this.remotePos, this.remoteImpliedPos, this.resumeFrameSent);
        }
    }

    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$ResumeComplete.class */
    private class ResumeComplete implements Runnable {
        private ResumeComplete() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ResumableDuplexConnection.this.doResumeComplete();
        }

        /* synthetic */ ResumeComplete(ResumableDuplexConnection resumableDuplexConnection, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$ResumeStart.class */
    public class ResumeStart implements Runnable {
        private final DuplexConnection connection;

        public ResumeStart(DuplexConnection duplexConnection) {
            this.connection = duplexConnection;
        }

        @Override // java.lang.Runnable
        public void run() {
            ResumableDuplexConnection.this.doResumeStart(this.connection);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/rsocket/resume/ResumableDuplexConnection$State.class */
    public static class State {
        static int CONNECTED = 0;
        static int RESUME_STARTED = 1;
        static int RESUME = 2;
        static int RESUME_COMPLETED = 3;
        static int DISCONNECTED = 4;

        State() {
        }
    }

    public ResumableDuplexConnection(String str, DuplexConnection duplexConnection, ResumableFramesStore resumableFramesStore, Duration duration, boolean z) {
        this.tag = str;
        this.resumableFramesStore = resumableFramesStore;
        this.resumeStreamTimeout = duration;
        this.cleanupOnKeepAlive = z;
        resumableFramesStore.saveFrames(this.resumeSaveStreamRequestListener.apply(this.resumeSaveFrames)).subscribe(this.resumeSaveCompleted);
        this.upstreams.flatMap(Function.identity()).subscribe(this.upstreamSubscriber);
        this.framesSent = this.connections.switchMap(duplexConnection2 -> {
            logger.debug("Switching transport: {}", str);
            return duplexConnection2.send(this.downStreamRequestListener.apply(this.downStreamFrames)).doFinally(signalType -> {
                logger.debug("{} Transport send completed: {}, {}", new Object[]{str, signalType, duplexConnection2.toString()});
            }).onErrorResume(th -> {
                return Mono.never();
            });
        }).then().cache();
        reconnect(duplexConnection);
    }

    @Override // io.rsocket.DuplexConnection
    public ByteBufAllocator alloc() {
        return this.curConnection.alloc();
    }

    public void disconnect() {
        DuplexConnection duplexConnection = this.curConnection;
        if (duplexConnection != null) {
            disconnect(duplexConnection);
        }
    }

    public void onDisconnect(Runnable runnable) {
        this.onDisconnect = runnable;
    }

    public void onResume(Runnable runnable) {
        this.onResume = runnable;
    }

    public void reconnect(DuplexConnection duplexConnection) {
        if (this.curConnection != null) {
            logger.debug("{} Resumable duplex connection reconnected with connection: {}", this.tag, duplexConnection);
            dispatch(new ResumeStart(duplexConnection));
        } else {
            logger.debug("{} Resumable duplex connection started with connection: {}", this.tag, duplexConnection);
            this.state = State.CONNECTED;
            onNewConnection(duplexConnection);
        }
    }

    public void resume(long j, long j2, Function<Mono<Long>, Mono<Void>> function) {
        dispatch(new Resume(j, j2, function));
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> sendOne(ByteBuf byteBuf) {
        return this.curConnection.sendOne(byteBuf);
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<ByteBuf> publisher) {
        this.upstreams.onNext(Flux.from(publisher));
        return this.framesSent;
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<ByteBuf> receive() {
        return this.connections.switchMap(duplexConnection -> {
            return duplexConnection.receive().doOnNext(byteBuf -> {
                if (isResumableFrame(byteBuf)) {
                    this.resumableFramesStore.resumableFrameReceived(byteBuf);
                }
            }).onErrorResume(th -> {
                return Mono.never();
            });
        });
    }

    public long position() {
        return this.resumableFramesStore.framePosition();
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public void onImpliedPosition(long j) {
        logger.debug("Got remote position from keep-alive: {}", Long.valueOf(j));
        if (this.cleanupOnKeepAlive) {
            dispatch(new ReleaseFrames(j));
        }
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return Flux.merge(new Publisher[]{this.connections.last().flatMap((v0) -> {
            return v0.onClose();
        }), this.resumeSaveCompleted}).then();
    }

    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            logger.debug("Resumable connection disposed: {}, {}", this.tag, this);
            this.upstreams.onComplete();
            this.connections.onComplete();
            this.connectionErrors.onComplete();
            this.resumeSaveFrames.onComplete();
            this.curConnection.dispose();
            this.upstreamSubscriber.dispose();
            this.resumedStreamDisposable.dispose();
            this.resumableFramesStore.dispose();
        }
    }

    @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
    public double availability() {
        return this.curConnection.availability();
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    private void sendFrame(ByteBuf byteBuf) {
        if (this.disposed.get()) {
            byteBuf.release();
            return;
        }
        if (this.state != State.RESUME && isResumableFrame(byteBuf)) {
            this.resumeSaveFrames.onNext(byteBuf);
        }
        if (this.state != State.RESUME_STARTED) {
            this.downStreamFrames.onNext(byteBuf);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<Throwable> connectionErrors() {
        return this.connectionErrors;
    }

    private void dispatch(Object obj) {
        this.actions.offer(obj);
        if (this.actionsWip.getAndIncrement() != 0) {
            return;
        }
        do {
            Object poll = this.actions.poll();
            if (poll instanceof ByteBuf) {
                sendFrame((ByteBuf) poll);
            } else {
                ((Runnable) poll).run();
            }
        } while (this.actionsWip.decrementAndGet() != 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doResumeStart(DuplexConnection duplexConnection) {
        this.state = State.RESUME_STARTED;
        this.resumedStreamDisposable.dispose();
        this.upstreamSubscriber.resumeStart();
        onNewConnection(duplexConnection);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doResume(long j, long j2, Function<Mono<Long>, Mono<Void>> function) {
        Mono<Long> error;
        long position = position();
        long impliedPosition = impliedPosition();
        logger.debug("Resumption start");
        logger.debug("Resumption states. local: [pos: {}, impliedPos: {}], remote: [pos: {}, impliedPos: {}]", new Object[]{Long.valueOf(position), Long.valueOf(impliedPosition), Long.valueOf(j), Long.valueOf(j2)});
        long calculateRemoteImpliedPos = calculateRemoteImpliedPos(position, impliedPosition, j, j2);
        if (calculateRemoteImpliedPos >= 0) {
            this.state = State.RESUME;
            releaseFramesToPosition(calculateRemoteImpliedPos);
            error = Mono.just(Long.valueOf(impliedPosition));
        } else {
            error = Mono.error(new ResumeStateException(position, impliedPosition, j, j2));
        }
        function.apply(error).doOnSuccess(r3 -> {
            Runnable runnable = this.onResume;
            if (runnable != null) {
                runnable.run();
            }
        }).then(streamResumedFrames(this.resumableFramesStore.resumeStream().timeout(this.resumeStreamTimeout).doFinally(signalType -> {
            dispatch(new ResumeComplete(this, null));
        })).doOnError(th -> {
            dispose();
        })).onErrorResume(th2 -> {
            return Mono.empty();
        }).subscribe();
    }

    static long calculateRemoteImpliedPos(long j, long j2, long j3, long j4) {
        if (j3 > j2 || j > j4) {
            return -1L;
        }
        return j4;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doResumeComplete() {
        logger.debug("Completing resumption");
        this.state = State.RESUME_COMPLETED;
        this.upstreamSubscriber.resumeComplete();
    }

    private Mono<Void> streamResumedFrames(Flux<ByteBuf> flux) {
        return Mono.create(monoSink -> {
            Flux<Long> requests = this.downStreamRequestListener.requests();
            Consumer consumer = (v1) -> {
                dispatch(v1);
            };
            monoSink.getClass();
            Consumer consumer2 = monoSink::error;
            monoSink.getClass();
            ResumeFramesSubscriber resumeFramesSubscriber = new ResumeFramesSubscriber(requests, consumer, consumer2, monoSink::success);
            monoSink.onDispose(resumeFramesSubscriber);
            this.resumedStreamDisposable = resumeFramesSubscriber;
            flux.subscribe(resumeFramesSubscriber);
        });
    }

    private void onNewConnection(DuplexConnection duplexConnection) {
        this.curConnection = duplexConnection;
        duplexConnection.onClose().doFinally(signalType -> {
            disconnect(duplexConnection);
        }).subscribe();
        this.connections.onNext(duplexConnection);
    }

    private void disconnect(DuplexConnection duplexConnection) {
        if (this.curConnection != duplexConnection || this.state == State.DISCONNECTED) {
            return;
        }
        duplexConnection.dispose();
        this.state = State.DISCONNECTED;
        logger.debug("{} Inner connection disconnected: {}", this.tag, closedChannelException.getClass().getSimpleName());
        this.connectionErrors.onNext(closedChannelException);
        Runnable runnable = this.onDisconnect;
        if (runnable != null) {
            runnable.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseFramesToPosition(long j) {
        this.resumableFramesStore.releaseFrames(j);
    }

    static boolean isResumableFrame(ByteBuf byteBuf) {
        switch (AnonymousClass1.$SwitchMap$io$rsocket$frame$FrameType[FrameHeaderCodec.nativeFrameType(byteBuf).ordinal()]) {
            case ErrorFrameCodec.INVALID_SETUP /* 1 */:
            case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
            case 3:
            case ErrorFrameCodec.REJECTED_RESUME /* 4 */:
            case 5:
            case 6:
            case 7:
            case 8:
                return true;
            default:
                return false;
        }
    }
}
