package io.rsocket;

import io.rsocket.Frame;
import io.rsocket.KeepAliveHandler;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.framing.FrameType;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/rsocket/RSocketClient.class */
class RSocketClient implements RSocket {
    private final DuplexConnection connection;
    private final Function<Frame, ? extends Payload> frameDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final StreamIdSupplier streamIdSupplier;
    private final NonBlockingHashMapLong<LimitableRequestPublisher> senders;
    private final NonBlockingHashMapLong<UnicastProcessor<Payload>> receivers;
    private final UnboundedProcessor<Frame> sendProcessor;
    private KeepAliveHandler keepAliveHandler;
    private final Lifecycle lifecycle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rsocket/RSocketClient$Lifecycle.class */
    public static class Lifecycle {
        private volatile Throwable terminationError;

        private Lifecycle() {
        }

        public Mono<Void> started() {
            return Mono.create(monoSink -> {
                Throwable th = this.terminationError;
                if (th == null) {
                    monoSink.success();
                } else {
                    monoSink.error(th);
                }
            });
        }

        public void terminate(Throwable th) {
            this.terminationError = th;
        }

        public Throwable terminationError() {
            return this.terminationError;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketClient(DuplexConnection duplexConnection, Function<Frame, ? extends Payload> function, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier) {
        this(duplexConnection, function, consumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketClient(DuplexConnection duplexConnection, Function<Frame, ? extends Payload> function, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier, Duration duration, Duration duration2, int i) {
        this.lifecycle = new Lifecycle();
        this.connection = duplexConnection;
        this.frameDecoder = function;
        this.errorConsumer = consumer;
        this.streamIdSupplier = streamIdSupplier;
        this.senders = new NonBlockingHashMapLong<>(FrameHeaderFlyweight.FLAGS_M);
        this.receivers = new NonBlockingHashMapLong<>(FrameHeaderFlyweight.FLAGS_M);
        this.sendProcessor = new UnboundedProcessor<>();
        duplexConnection.onClose().doFinally(signalType -> {
            cleanup();
        }).subscribe((Consumer) null, consumer);
        duplexConnection.send(this.sendProcessor).doFinally(this::handleSendProcessorCancel).subscribe((Consumer) null, this::handleSendProcessorError);
        duplexConnection.receive().subscribe(this::handleIncomingFrames, consumer);
        if (Duration.ZERO.equals(duration)) {
            this.keepAliveHandler = null;
            return;
        }
        this.keepAliveHandler = KeepAliveHandler.ofClient(new KeepAliveHandler.KeepAlive(duration, duration2, i));
        this.keepAliveHandler.timeout().subscribe(keepAlive -> {
            consumer.accept(new ConnectionErrorException(String.format("No keep-alive acks for %d ms", Long.valueOf(keepAlive.getTimeoutMillis()))));
            duplexConnection.dispose();
        });
        Flux<Frame> send = this.keepAliveHandler.send();
        UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        send.subscribe((v1) -> {
            r1.onNext(v1);
        });
    }

    private void handleSendProcessorError(Throwable th) {
        Throwable terminationError = this.lifecycle.terminationError();
        Throwable th2 = terminationError != null ? terminationError : th;
        Iterator it = this.receivers.values().iterator();
        while (it.hasNext()) {
            try {
                ((Subscriber) it.next()).onError(th2);
            } catch (Throwable th3) {
                this.errorConsumer.accept(th3);
            }
        }
        Iterator it2 = this.senders.values().iterator();
        while (it2.hasNext()) {
            ((LimitableRequestPublisher) it2.next()).cancel();
        }
    }

    private void handleSendProcessorCancel(SignalType signalType) {
        if (SignalType.ON_ERROR == signalType) {
            return;
        }
        Iterator it = this.receivers.values().iterator();
        while (it.hasNext()) {
            try {
                ((Subscriber) it.next()).onError(new Throwable("closed connection"));
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        }
        Iterator it2 = this.senders.values().iterator();
        while (it2.hasNext()) {
            ((LimitableRequestPublisher) it2.next()).cancel();
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return handleFireAndForget(payload);
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return handleRequestResponse(payload);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        return handleRequestStream(payload);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return handleChannel(Flux.from(publisher));
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        return Mono.fromRunnable(() -> {
            Frame from = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1);
            payload.release();
            this.sendProcessor.onNext(from);
        });
    }

    @Override // io.rsocket.RSocket, io.rsocket.Availability
    public double availability() {
        return this.connection.availability();
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private Mono<Void> handleFireAndForget(Payload payload) {
        return this.lifecycle.started().then(Mono.fromRunnable(() -> {
            Frame from = Frame.Request.from(this.streamIdSupplier.nextStreamId(), FrameType.REQUEST_FNF, payload, 1);
            payload.release();
            this.sendProcessor.onNext(from);
        }));
    }

    private Flux<Payload> handleRequestStream(Payload payload) {
        return this.lifecycle.started().thenMany(Flux.defer(() -> {
            int nextStreamId = this.streamIdSupplier.nextStreamId();
            UnicastProcessor create = UnicastProcessor.create();
            this.receivers.put(nextStreamId, create);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            return create.doOnRequest(j -> {
                if (atomicBoolean.compareAndSet(false, true) && !create.isDisposed()) {
                    Frame from = Frame.Request.from(nextStreamId, FrameType.REQUEST_STREAM, payload, j);
                    payload.release();
                    this.sendProcessor.onNext(from);
                } else if (contains(nextStreamId) && !create.isDisposed()) {
                    this.sendProcessor.onNext(Frame.RequestN.from(nextStreamId, j));
                }
                this.sendProcessor.drain();
            }).doOnError(th -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                this.sendProcessor.onNext(Frame.Error.from(nextStreamId, th));
            }).doOnCancel(() -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                this.sendProcessor.onNext(Frame.Cancel.from(nextStreamId));
            }).doFinally(signalType -> {
                this.receivers.remove(nextStreamId);
            });
        }));
    }

    private Mono<Payload> handleRequestResponse(Payload payload) {
        return this.lifecycle.started().then(Mono.defer(() -> {
            int nextStreamId = this.streamIdSupplier.nextStreamId();
            Frame from = Frame.Request.from(nextStreamId, FrameType.REQUEST_RESPONSE, payload, 1);
            payload.release();
            UnicastProcessor create = UnicastProcessor.create();
            this.receivers.put(nextStreamId, create);
            this.sendProcessor.onNext(from);
            return create.singleOrEmpty().doOnError(th -> {
                this.sendProcessor.onNext(Frame.Error.from(nextStreamId, th));
            }).doOnCancel(() -> {
                this.sendProcessor.onNext(Frame.Cancel.from(nextStreamId));
            }).doFinally(signalType -> {
                this.receivers.remove(nextStreamId);
            });
        }));
    }

    private Flux<Payload> handleChannel(Flux<Payload> flux) {
        return this.lifecycle.started().thenMany(Flux.defer(() -> {
            UnicastProcessor create = UnicastProcessor.create();
            int nextStreamId = this.streamIdSupplier.nextStreamId();
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            return create.doOnRequest(j -> {
                if (!atomicBoolean.compareAndSet(true, false)) {
                    if (!contains(nextStreamId) || create.isDisposed()) {
                        return;
                    }
                    this.sendProcessor.onNext(Frame.RequestN.from(nextStreamId, j));
                    return;
                }
                AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
                Flux doOnComplete = flux.transform(flux2 -> {
                    LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux2);
                    wrap.increaseRequestLimit(1L);
                    this.senders.put(nextStreamId, wrap);
                    this.receivers.put(nextStreamId, create);
                    return wrap;
                }).map(payload -> {
                    Frame from = atomicBoolean2.compareAndSet(true, false) ? Frame.Request.from(nextStreamId, FrameType.REQUEST_CHANNEL, payload, j) : Frame.PayloadFrame.from(nextStreamId, FrameType.NEXT, payload);
                    payload.release();
                    return from;
                }).doOnComplete(() -> {
                    if (contains(nextStreamId) && !create.isDisposed()) {
                        this.sendProcessor.onNext(Frame.PayloadFrame.from(nextStreamId, FrameType.COMPLETE));
                    }
                    if (atomicBoolean2.get()) {
                        create.onComplete();
                    }
                });
                UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
                unboundedProcessor.getClass();
                doOnComplete.subscribe((v1) -> {
                    r1.onNext(v1);
                }, th -> {
                    this.errorConsumer.accept(th);
                    create.dispose();
                });
            }).doOnError(th -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                this.sendProcessor.onNext(Frame.Error.from(nextStreamId, th));
            }).doOnCancel(() -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                this.sendProcessor.onNext(Frame.Cancel.from(nextStreamId));
            }).doFinally(signalType -> {
                this.receivers.remove(nextStreamId);
                LimitableRequestPublisher limitableRequestPublisher = (LimitableRequestPublisher) this.senders.remove(nextStreamId);
                if (limitableRequestPublisher != null) {
                    limitableRequestPublisher.cancel();
                }
            });
        }));
    }

    private boolean contains(int i) {
        return this.receivers.containsKey(i);
    }

    protected void cleanup() {
        if (this.keepAliveHandler != null) {
            this.keepAliveHandler.dispose();
        }
        try {
            Iterator it = this.receivers.values().iterator();
            while (it.hasNext()) {
                cleanUpSubscriber((UnicastProcessor) it.next());
            }
            Iterator it2 = this.senders.values().iterator();
            while (it2.hasNext()) {
                cleanUpLimitableRequestPublisher((LimitableRequestPublisher) it2.next());
            }
        } finally {
            this.senders.clear();
            this.receivers.clear();
            this.sendProcessor.dispose();
        }
    }

    private synchronized void cleanUpLimitableRequestPublisher(LimitableRequestPublisher<?> limitableRequestPublisher) {
        try {
            limitableRequestPublisher.cancel();
        } catch (Throwable th) {
            this.errorConsumer.accept(th);
        }
    }

    private synchronized void cleanUpSubscriber(UnicastProcessor<?> unicastProcessor) {
        Throwable terminationError = this.lifecycle.terminationError();
        try {
            if (terminationError != null) {
                unicastProcessor.onError(terminationError);
            } else {
                unicastProcessor.cancel();
            }
        } catch (Throwable th) {
            this.errorConsumer.accept(th);
        }
    }

    private void handleIncomingFrames(Frame frame) {
        try {
            int streamId = frame.getStreamId();
            FrameType type = frame.getType();
            if (streamId == 0) {
                handleStreamZero(type, frame);
            } else {
                handleFrame(streamId, type, frame);
            }
        } finally {
            frame.release();
        }
    }

    private void handleStreamZero(FrameType frameType, Frame frame) {
        switch (frameType) {
            case ERROR:
                RuntimeException from = Exceptions.from(frame);
                this.lifecycle.terminate(from);
                this.errorConsumer.accept(from);
                this.connection.dispose();
                return;
            case LEASE:
                return;
            case KEEPALIVE:
                if (this.keepAliveHandler != null) {
                    this.keepAliveHandler.receive(frame);
                    return;
                }
                return;
            default:
                this.errorConsumer.accept(new IllegalStateException("Client received supported frame on stream 0: " + frame.toString()));
                return;
        }
    }

    private void handleFrame(int i, FrameType frameType, Frame frame) {
        Subscriber subscriber = (Subscriber) this.receivers.get(i);
        if (subscriber == null) {
            handleMissingResponseProcessor(i, frameType, frame);
            return;
        }
        switch (frameType) {
            case ERROR:
                subscriber.onError(Exceptions.from(frame));
                this.receivers.remove(i);
                return;
            case LEASE:
            case KEEPALIVE:
            default:
                throw new IllegalStateException("Client received supported frame on stream " + i + ": " + frame.toString());
            case NEXT_COMPLETE:
                subscriber.onNext(this.frameDecoder.apply(frame));
                subscriber.onComplete();
                return;
            case CANCEL:
                LimitableRequestPublisher limitableRequestPublisher = (LimitableRequestPublisher) this.senders.remove(i);
                this.receivers.remove(i);
                if (limitableRequestPublisher != null) {
                    limitableRequestPublisher.cancel();
                    return;
                }
                return;
            case NEXT:
                subscriber.onNext(this.frameDecoder.apply(frame));
                return;
            case REQUEST_N:
                LimitableRequestPublisher limitableRequestPublisher2 = (LimitableRequestPublisher) this.senders.get(i);
                if (limitableRequestPublisher2 != null) {
                    limitableRequestPublisher2.increaseRequestLimit(Frame.RequestN.requestN(frame));
                    this.sendProcessor.drain();
                    return;
                }
                return;
            case COMPLETE:
                subscriber.onComplete();
                this.receivers.remove(i);
                return;
        }
    }

    private void handleMissingResponseProcessor(int i, FrameType frameType, Frame frame) {
        if (this.streamIdSupplier.isBeforeOrCurrent(i)) {
            return;
        }
        if (frameType != FrameType.ERROR) {
            throw new IllegalStateException("Client received message for non-existent stream: " + i + ", frame type: " + frameType);
        }
        throw new IllegalStateException("Client received error for non-existent stream: " + i + " Message: " + frame.getDataUtf8());
    }
}
