package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.CancelFrameFlyweight;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.MetadataPushFrameFlyweight;
import io.rsocket.frame.PayloadFrameFlyweight;
import io.rsocket.frame.RequestChannelFrameFlyweight;
import io.rsocket.frame.RequestFireAndForgetFrameFlyweight;
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.RequestResponseFrameFlyweight;
import io.rsocket.frame.RequestStreamFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.internal.UnicastMonoProcessor;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/rsocket/RSocketClient.class */
public class RSocketClient implements RSocket {
    private final DuplexConnection connection;
    private final PayloadDecoder payloadDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final StreamIdSupplier streamIdSupplier;
    private final Map<Integer, LimitableRequestPublisher> senders;
    private final Map<Integer, Processor<Payload, Payload>> receivers;
    private final UnboundedProcessor<ByteBuf> sendProcessor;
    private final Lifecycle lifecycle;
    private final ByteBufAllocator allocator;
    private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rsocket/RSocketClient$Lifecycle.class */
    public static class Lifecycle {
        private static final AtomicReferenceFieldUpdater<Lifecycle, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(Lifecycle.class, Throwable.class, "terminationError");
        private volatile Throwable terminationError;

        private Lifecycle() {
        }

        public Mono<Void> active(Runnable runnable) {
            return Mono.create(monoSink -> {
                if (this.terminationError != null) {
                    monoSink.error(this.terminationError);
                } else {
                    runnable.run();
                    monoSink.success();
                }
            });
        }

        public <T> Mono<T> activeMono(Supplier<? extends Mono<? extends T>> supplier) {
            return Mono.defer(() -> {
                return this.terminationError == null ? (Mono) supplier.get() : Mono.error(this.terminationError);
            });
        }

        public <T> Flux<T> activeFlux(Supplier<? extends Flux<T>> supplier) {
            return Flux.defer(() -> {
                return this.terminationError == null ? (Publisher) supplier.get() : Flux.error(this.terminationError);
            });
        }

        public Throwable getTerminationError() {
            return this.terminationError;
        }

        public void setTerminationError(Throwable th) {
            TERMINATION_ERROR.compareAndSet(this, null, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketClient(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, PayloadDecoder payloadDecoder, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier, int i, int i2, KeepAliveHandler keepAliveHandler) {
        this.lifecycle = new Lifecycle();
        this.allocator = byteBufAllocator;
        this.connection = duplexConnection;
        this.payloadDecoder = payloadDecoder;
        this.errorConsumer = consumer;
        this.streamIdSupplier = streamIdSupplier;
        this.senders = Collections.synchronizedMap(new IntObjectHashMap());
        this.receivers = Collections.synchronizedMap(new IntObjectHashMap());
        this.sendProcessor = new UnboundedProcessor<>();
        duplexConnection.onClose().doFinally(signalType -> {
            terminate();
        }).subscribe((Consumer) null, consumer);
        duplexConnection.send(this.sendProcessor).doFinally(this::handleSendProcessorCancel).subscribe((Consumer) null, this::handleSendProcessorError);
        duplexConnection.receive().subscribe(this::handleIncomingFrames, consumer);
        if (i == 0 || keepAliveHandler == null) {
            this.keepAliveFramesAcceptor = null;
            return;
        }
        KeepAliveSupport.ClientKeepAliveSupport clientKeepAliveSupport = new KeepAliveSupport.ClientKeepAliveSupport(byteBufAllocator, i, i2);
        UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        this.keepAliveFramesAcceptor = keepAliveHandler.start(clientKeepAliveSupport, (v1) -> {
            r3.onNext(v1);
        }, this::terminate);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketClient(ByteBufAllocator byteBufAllocator, DuplexConnection duplexConnection, PayloadDecoder payloadDecoder, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier) {
        this(byteBufAllocator, duplexConnection, payloadDecoder, consumer, streamIdSupplier, 0, 0, null);
    }

    private void terminate(KeepAliveSupport.KeepAlive keepAlive) {
        ConnectionErrorException connectionErrorException = new ConnectionErrorException(String.format("No keep-alive acks for %d ms", Long.valueOf(keepAlive.getTimeout().toMillis())));
        this.lifecycle.setTerminationError(connectionErrorException);
        this.errorConsumer.accept(connectionErrorException);
        this.connection.dispose();
    }

    private void handleSendProcessorError(Throwable th) {
        Throwable terminationError = this.lifecycle.getTerminationError();
        Throwable th2 = terminationError != null ? terminationError : th;
        this.receivers.values().forEach(processor -> {
            try {
                processor.onError(th2);
            } catch (Throwable th3) {
                this.errorConsumer.accept(th3);
            }
        });
        this.senders.values().forEach((v0) -> {
            v0.cancel();
        });
    }

    private void handleSendProcessorCancel(SignalType signalType) {
        if (SignalType.ON_ERROR == signalType) {
            return;
        }
        this.receivers.values().forEach(processor -> {
            try {
                processor.onError(new Throwable("closed connection"));
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        });
        this.senders.values().forEach((v0) -> {
            v0.cancel();
        });
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return handleFireAndForget(payload);
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return handleRequestResponse(payload);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        return handleRequestStream(payload);
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return handleChannel(Flux.from(publisher));
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        return handleMetadataPush(payload);
    }

    @Override // io.rsocket.RSocket, io.rsocket.Availability
    public double availability() {
        return this.connection.availability();
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private Mono<Void> handleFireAndForget(Payload payload) {
        return this.lifecycle.active(() -> {
            ByteBuf encode = RequestFireAndForgetFrameFlyweight.encode(this.allocator, this.streamIdSupplier.nextStreamId(), false, payload.hasMetadata() ? payload.sliceMetadata().retain() : null, payload.sliceData().retain());
            payload.release();
            this.sendProcessor.onNext(encode);
        });
    }

    private Mono<Payload> handleRequestResponse(Payload payload) {
        return this.lifecycle.activeMono(() -> {
            int nextStreamId = this.streamIdSupplier.nextStreamId();
            UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
            ByteBuf encode = RequestResponseFrameFlyweight.encode(this.allocator, nextStreamId, false, payload.sliceMetadata().retain(), payload.sliceData().retain());
            payload.release();
            UnicastMonoProcessor create = UnicastMonoProcessor.create();
            this.receivers.put(Integer.valueOf(nextStreamId), create);
            unboundedProcessor.onNext(encode);
            return create.doOnError(th -> {
                unboundedProcessor.onNext(ErrorFrameFlyweight.encode(this.allocator, nextStreamId, th));
            }).doFinally(signalType -> {
                if (signalType == SignalType.CANCEL) {
                    unboundedProcessor.onNext(CancelFrameFlyweight.encode(this.allocator, nextStreamId));
                }
                this.receivers.remove(Integer.valueOf(nextStreamId));
            });
        });
    }

    private Flux<Payload> handleRequestStream(Payload payload) {
        return this.lifecycle.activeFlux(() -> {
            final int nextStreamId = this.streamIdSupplier.nextStreamId();
            final UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
            final Processor<Payload, Payload> create = UnicastProcessor.create();
            this.receivers.put(Integer.valueOf(nextStreamId), create);
            return create.doOnRequest(new LongConsumer() { // from class: io.rsocket.RSocketClient.1
                boolean firstRequest = true;

                @Override // java.util.function.LongConsumer
                public void accept(long j) {
                    if (this.firstRequest && !create.isDisposed()) {
                        this.firstRequest = false;
                        unboundedProcessor.onNext(RequestStreamFrameFlyweight.encode(RSocketClient.this.allocator, nextStreamId, false, j, payload.sliceMetadata().retain(), payload.sliceData().retain()));
                        payload.release();
                    } else {
                        if (!RSocketClient.this.contains(nextStreamId) || create.isDisposed()) {
                            return;
                        }
                        unboundedProcessor.onNext(RequestNFrameFlyweight.encode(RSocketClient.this.allocator, nextStreamId, j));
                    }
                }
            }).doOnError(th -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                unboundedProcessor.onNext(ErrorFrameFlyweight.encode(this.allocator, nextStreamId, th));
            }).doOnCancel(() -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                unboundedProcessor.onNext(CancelFrameFlyweight.encode(this.allocator, nextStreamId));
            }).doFinally(signalType -> {
                this.receivers.remove(Integer.valueOf(nextStreamId));
            });
        });
    }

    private Flux<Payload> handleChannel(Flux<Payload> flux) {
        return this.lifecycle.activeFlux(() -> {
            final UnboundedProcessor<ByteBuf> unboundedProcessor = this.sendProcessor;
            final UnicastProcessor create = UnicastProcessor.create();
            final int nextStreamId = this.streamIdSupplier.nextStreamId();
            return create.doOnRequest(new LongConsumer() { // from class: io.rsocket.RSocketClient.2
                boolean firstRequest = true;

                @Override // java.util.function.LongConsumer
                public void accept(final long j) {
                    if (!this.firstRequest) {
                        if (!RSocketClient.this.contains(nextStreamId) || create.isDisposed()) {
                            return;
                        }
                        unboundedProcessor.onNext(RequestNFrameFlyweight.encode(RSocketClient.this.allocator, nextStreamId, j));
                        return;
                    }
                    this.firstRequest = false;
                    Flux flux2 = flux;
                    int i = nextStreamId;
                    UnicastProcessor unicastProcessor = create;
                    flux2.transform(flux3 -> {
                        LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux3);
                        wrap.request(1L);
                        RSocketClient.this.senders.put(Integer.valueOf(i), wrap);
                        RSocketClient.this.receivers.put(Integer.valueOf(i), unicastProcessor);
                        return wrap;
                    }).subscribe(new BaseSubscriber<Payload>() { // from class: io.rsocket.RSocketClient.2.1
                        boolean firstPayload = true;

                        /* JADX INFO: Access modifiers changed from: protected */
                        public void hookOnNext(Payload payload) {
                            ByteBuf encode;
                            if (this.firstPayload) {
                                this.firstPayload = false;
                                encode = RequestChannelFrameFlyweight.encode(RSocketClient.this.allocator, nextStreamId, false, false, j, payload.sliceMetadata().retain(), payload.sliceData().retain());
                            } else {
                                encode = PayloadFrameFlyweight.encode(RSocketClient.this.allocator, nextStreamId, false, false, true, payload);
                            }
                            unboundedProcessor.onNext(encode);
                            payload.release();
                        }

                        protected void hookOnComplete() {
                            if (RSocketClient.this.contains(nextStreamId) && !create.isDisposed()) {
                                unboundedProcessor.onNext(PayloadFrameFlyweight.encodeComplete(RSocketClient.this.allocator, nextStreamId));
                            }
                            if (this.firstPayload) {
                                create.onComplete();
                            }
                        }

                        protected void hookOnError(Throwable th) {
                            RSocketClient.this.errorConsumer.accept(th);
                            create.dispose();
                        }
                    });
                }
            }).doOnError(th -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                unboundedProcessor.onNext(ErrorFrameFlyweight.encode(this.allocator, nextStreamId, th));
            }).doOnCancel(() -> {
                if (!contains(nextStreamId) || create.isDisposed()) {
                    return;
                }
                unboundedProcessor.onNext(CancelFrameFlyweight.encode(this.allocator, nextStreamId));
            }).doFinally(signalType -> {
                this.receivers.remove(Integer.valueOf(nextStreamId));
                LimitableRequestPublisher remove = this.senders.remove(Integer.valueOf(nextStreamId));
                if (remove != null) {
                    remove.cancel();
                }
            });
        });
    }

    private Mono<Void> handleMetadataPush(Payload payload) {
        return this.lifecycle.active(() -> {
            this.sendProcessor.onNext(MetadataPushFrameFlyweight.encode(this.allocator, payload.sliceMetadata().retain()));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean contains(int i) {
        return this.receivers.containsKey(Integer.valueOf(i));
    }

    protected void terminate() {
        this.lifecycle.setTerminationError(new ClosedChannelException());
        try {
            this.receivers.values().forEach(this::cleanUpSubscriber);
            this.senders.values().forEach(this::cleanUpLimitableRequestPublisher);
        } finally {
            this.senders.clear();
            this.receivers.clear();
            this.sendProcessor.dispose();
        }
    }

    private synchronized void cleanUpLimitableRequestPublisher(LimitableRequestPublisher<?> limitableRequestPublisher) {
        try {
            limitableRequestPublisher.cancel();
        } catch (Throwable th) {
            this.errorConsumer.accept(th);
        }
    }

    private synchronized void cleanUpSubscriber(Processor processor) {
        try {
            processor.onError(this.lifecycle.getTerminationError());
        } catch (Throwable th) {
            this.errorConsumer.accept(th);
        }
    }

    private void handleIncomingFrames(ByteBuf byteBuf) {
        try {
            int streamId = FrameHeaderFlyweight.streamId(byteBuf);
            FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
            if (streamId == 0) {
                handleStreamZero(frameType, byteBuf);
            } else {
                handleFrame(streamId, frameType, byteBuf);
            }
            byteBuf.release();
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(byteBuf);
            throw Exceptions.propagate(th);
        }
    }

    private void handleStreamZero(FrameType frameType, ByteBuf byteBuf) {
        switch (frameType) {
            case ERROR:
                RuntimeException from = io.rsocket.exceptions.Exceptions.from(byteBuf);
                this.lifecycle.setTerminationError(from);
                this.errorConsumer.accept(from);
                this.connection.dispose();
                return;
            case LEASE:
                return;
            case KEEPALIVE:
                if (this.keepAliveFramesAcceptor != null) {
                    this.keepAliveFramesAcceptor.receive(byteBuf);
                    return;
                }
                return;
            default:
                this.errorConsumer.accept(new IllegalStateException("Client received supported frame on stream 0: " + byteBuf.toString()));
                return;
        }
    }

    private void handleFrame(int i, FrameType frameType, ByteBuf byteBuf) {
        Subscriber subscriber = this.receivers.get(Integer.valueOf(i));
        if (subscriber == null) {
            handleMissingResponseProcessor(i, frameType, byteBuf);
            return;
        }
        switch (frameType) {
            case ERROR:
                subscriber.onError(io.rsocket.exceptions.Exceptions.from(byteBuf));
                this.receivers.remove(Integer.valueOf(i));
                return;
            case LEASE:
            case KEEPALIVE:
            default:
                throw new IllegalStateException("Client received supported frame on stream " + i + ": " + byteBuf.toString());
            case NEXT_COMPLETE:
                subscriber.onNext(this.payloadDecoder.apply(byteBuf));
                subscriber.onComplete();
                return;
            case CANCEL:
                LimitableRequestPublisher remove = this.senders.remove(Integer.valueOf(i));
                if (remove != null) {
                    remove.cancel();
                    return;
                }
                return;
            case NEXT:
                subscriber.onNext(this.payloadDecoder.apply(byteBuf));
                return;
            case REQUEST_N:
                LimitableRequestPublisher limitableRequestPublisher = this.senders.get(Integer.valueOf(i));
                if (limitableRequestPublisher != null) {
                    int requestN = RequestNFrameFlyweight.requestN(byteBuf);
                    limitableRequestPublisher.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
                    return;
                }
                return;
            case COMPLETE:
                subscriber.onComplete();
                this.receivers.remove(Integer.valueOf(i));
                return;
        }
    }

    private void handleMissingResponseProcessor(int i, FrameType frameType, ByteBuf byteBuf) {
        if (this.streamIdSupplier.isBeforeOrCurrent(i)) {
            return;
        }
        if (frameType != FrameType.ERROR) {
            throw new IllegalStateException("Client received message for non-existent stream: " + i + ", frame type: " + frameType);
        }
        throw new IllegalStateException("Client received error for non-existent stream: " + i + " Message: " + ErrorFrameFlyweight.dataUtf8(byteBuf));
    }
}
