package io.reactivesocket;

import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.reactivesocket.Frame;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.exceptions.Exceptions;
import io.reactivesocket.frame.FrameHeaderFlyweight;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.LimitableRequestPublisher;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseImpl;
import io.reactivesocket.util.PayloadImpl;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/reactivesocket/ClientReactiveSocket.class */
public class ClientReactiveSocket implements ReactiveSocket {
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private final DuplexConnection connection;
    private final Consumer<Throwable> errorConsumer;
    private final StreamIdSupplier streamIdSupplier;
    private final KeepAliveProvider keepAliveProvider;
    private final MonoProcessor<Void> started = MonoProcessor.create();
    private final IntObjectHashMap<LimitableRequestPublisher> senders = new IntObjectHashMap<>(FrameHeaderFlyweight.FLAGS_M, 0.9f);
    private final IntObjectHashMap<Subscriber<Payload>> receivers = new IntObjectHashMap<>(FrameHeaderFlyweight.FLAGS_M, 0.9f);
    private Disposable keepAliveSendSub;
    private volatile Consumer<Lease> leaseConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.reactivesocket.ClientReactiveSocket$1, reason: invalid class name */
    /* loaded from: input_file:io/reactivesocket/ClientReactiveSocket$1.class */
    public class AnonymousClass1 implements Supplier<Publisher<Payload>> {
        final int streamId;
        volatile MonoProcessor<Void> subscribedRequests;
        final /* synthetic */ Flux val$request;
        final /* synthetic */ FrameType val$requestType;
        final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
        boolean firstRequest = true;

        AnonymousClass1(Flux flux, FrameType frameType) {
            this.val$request = flux;
            this.val$requestType = frameType;
            this.streamId = ClientReactiveSocket.this.streamIdSupplier.nextStreamId();
        }

        boolean isValidToSendFrame() {
            return ClientReactiveSocket.this.contains(this.streamId) && ClientReactiveSocket.this.connection.availability() > 0.0d && !this.receiver.isTerminated();
        }

        void sendOneFrame(Frame frame) {
            if (isValidToSendFrame()) {
                Mono<Void> sendOne = ClientReactiveSocket.this.connection.sendOne(frame);
                Consumer consumer = ClientReactiveSocket.this.errorConsumer;
                consumer.getClass();
                sendOne.doOnError((v1) -> {
                    r1.accept(v1);
                }).subscribe();
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public Publisher<Payload> get() {
            UnicastProcessor<Payload> unicastProcessor = this.receiver;
            Flux flux = this.val$request;
            FrameType frameType = this.val$requestType;
            return unicastProcessor.doOnRequest(j -> {
                boolean z = false;
                synchronized (ClientReactiveSocket.this) {
                    if (this.firstRequest) {
                        z = true;
                        this.firstRequest = false;
                    }
                }
                if (!z) {
                    sendOneFrame(Frame.RequestN.from(this.streamId, j));
                } else {
                    this.subscribedRequests = ClientReactiveSocket.this.connection.send(flux.transform(flux2 -> {
                        LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux2);
                        synchronized (ClientReactiveSocket.this) {
                            ClientReactiveSocket.this.senders.put(this.streamId, wrap);
                            ClientReactiveSocket.this.receivers.put(this.streamId, this.receiver);
                        }
                        return wrap;
                    }).map(new Function<Payload, Frame>() { // from class: io.reactivesocket.ClientReactiveSocket.1.1
                        boolean firstPayload = true;

                        @Override // java.util.function.Function
                        public Frame apply(Payload payload) {
                            boolean z2 = false;
                            synchronized (this) {
                                if (this.firstPayload) {
                                    this.firstPayload = false;
                                    z2 = true;
                                }
                            }
                            return z2 ? Frame.Request.from(AnonymousClass1.this.streamId, frameType, payload, j) : Frame.PayloadFrame.from(AnonymousClass1.this.streamId, FrameType.NEXT, payload);
                        }
                    }).doOnComplete(() -> {
                        if (FrameType.REQUEST_CHANNEL == frameType) {
                            sendOneFrame(Frame.PayloadFrame.from(this.streamId, FrameType.COMPLETE));
                        }
                    })).doOnError(th -> {
                        ClientReactiveSocket.this.errorConsumer.accept(th);
                        this.receiver.cancel();
                    }).subscribe();
                }
            }).doOnError(th -> {
                sendOneFrame(Frame.Error.from(this.streamId, th));
            }).doOnCancel(() -> {
                sendOneFrame(Frame.Cancel.from(this.streamId));
                if (this.subscribedRequests != null) {
                    this.subscribedRequests.cancel();
                }
            }).doFinally(signalType -> {
                ClientReactiveSocket.this.removeReceiver(this.streamId);
                ClientReactiveSocket.this.removeSender(this.streamId);
            });
        }
    }

    public ClientReactiveSocket(DuplexConnection duplexConnection, Consumer<Throwable> consumer, StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
        this.connection = duplexConnection;
        this.errorConsumer = new KnownErrorFilter(consumer);
        this.streamIdSupplier = streamIdSupplier;
        this.keepAliveProvider = keepAliveProvider;
        Mono doFinally = duplexConnection.onClose().doFinally(signalType -> {
            cleanup();
        });
        consumer.getClass();
        doFinally.doOnError((v1) -> {
            r1.accept(v1);
        }).subscribe();
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> fireAndForget(Payload payload) {
        return this.started.then(Mono.defer(() -> {
            return this.connection.sendOne(Frame.Request.from(this.streamIdSupplier.nextStreamId(), FrameType.FIRE_AND_FORGET, payload, 1));
        }));
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Payload> requestResponse(Payload payload) {
        return handleRequestResponse(payload);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Flux<Payload> requestStream(Payload payload) {
        return handleStreamResponse(Flux.just(payload), FrameType.REQUEST_STREAM);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return handleStreamResponse(Flux.from(publisher), FrameType.REQUEST_CHANNEL);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> metadataPush(Payload payload) {
        return this.connection.sendOne(Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 1));
    }

    @Override // io.reactivesocket.ReactiveSocket, io.reactivesocket.Availability
    public double availability() {
        return this.connection.availability();
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> close() {
        return this.connection.close();
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    public ClientReactiveSocket start(Consumer<Lease> consumer) {
        this.leaseConsumer = consumer;
        this.keepAliveSendSub = this.connection.send(this.keepAliveProvider.ticks().map(l -> {
            return Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true);
        })).subscribe((Consumer) null, this.errorConsumer);
        this.connection.receive().doOnSubscribe(subscription -> {
            this.started.onComplete();
        }).doOnNext(this::handleIncomingFrames).doOnError(this.errorConsumer).subscribe();
        return this;
    }

    private Mono<Payload> handleRequestResponse(Payload payload) {
        return this.started.then(() -> {
            int nextStreamId = this.streamIdSupplier.nextStreamId();
            Frame from = Frame.Request.from(nextStreamId, FrameType.REQUEST_RESPONSE, payload, 1);
            MonoProcessor create = MonoProcessor.create();
            synchronized (this) {
                this.receivers.put(nextStreamId, create);
            }
            MonoProcessor subscribe = this.connection.sendOne(from).doOnError(th -> {
                this.errorConsumer.accept(th);
                create.cancel();
            }).subscribe();
            return create.doOnError(th2 -> {
                if (!contains(nextStreamId) || this.connection.availability() <= 0.0d || create.isTerminated()) {
                    return;
                }
                Mono<Void> sendOne = this.connection.sendOne(Frame.Error.from(nextStreamId, th2));
                Consumer<Throwable> consumer = this.errorConsumer;
                consumer.getClass();
                sendOne.doOnError((v1) -> {
                    r1.accept(v1);
                }).subscribe();
            }).doOnCancel(() -> {
                if (contains(nextStreamId) && this.connection.availability() > 0.0d && !create.isTerminated()) {
                    Mono<Void> sendOne = this.connection.sendOne(Frame.Cancel.from(nextStreamId));
                    Consumer<Throwable> consumer = this.errorConsumer;
                    consumer.getClass();
                    sendOne.doOnError((v1) -> {
                        r1.accept(v1);
                    }).subscribe();
                }
                subscribe.cancel();
            }).doFinally(signalType -> {
                removeReceiver(nextStreamId);
            });
        });
    }

    private Flux<Payload> handleStreamResponse(Flux<Payload> flux, FrameType frameType) {
        return this.started.thenMany(new AnonymousClass1(flux, frameType));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean contains(int i) {
        boolean containsKey;
        synchronized (this) {
            containsKey = this.receivers.containsKey(i);
        }
        return containsKey;
    }

    protected void cleanup() {
        this.senders.forEach((num, limitableRequestPublisher) -> {
            cleanUpLimitableRequestPublisher(limitableRequestPublisher);
        });
        this.receivers.forEach((num2, subscriber) -> {
            cleanUpSubscriber(subscriber);
        });
        synchronized (this) {
            this.senders.clear();
            this.receivers.clear();
        }
        if (null != this.keepAliveSendSub) {
            this.keepAliveSendSub.dispose();
        }
    }

    private synchronized void cleanUpLimitableRequestPublisher(LimitableRequestPublisher<?> limitableRequestPublisher) {
        limitableRequestPublisher.cancel();
    }

    private synchronized void cleanUpSubscriber(Subscriber<?> subscriber) {
        subscriber.onError(CLOSED_CHANNEL_EXCEPTION);
    }

    private void handleIncomingFrames(Frame frame) {
        try {
            int streamId = frame.getStreamId();
            FrameType type = frame.getType();
            if (streamId == 0) {
                handleStreamZero(type, frame);
            } else {
                handleFrame(streamId, type, frame);
            }
        } finally {
            frame.release();
        }
    }

    private void handleStreamZero(FrameType frameType, Frame frame) {
        switch (frameType) {
            case ERROR:
                throw Exceptions.from(frame);
            case LEASE:
                if (this.leaseConsumer != null) {
                    this.leaseConsumer.accept(new LeaseImpl(frame));
                    return;
                }
                return;
            case KEEPALIVE:
                if (Frame.Keepalive.hasRespondFlag(frame)) {
                    return;
                }
                this.keepAliveProvider.ack();
                return;
            default:
                this.errorConsumer.accept(new IllegalStateException("Client received supported frame on stream 0: " + frame.toString()));
                return;
        }
    }

    private void handleFrame(int i, FrameType frameType, Frame frame) {
        Subscriber subscriber;
        LimitableRequestPublisher limitableRequestPublisher;
        LimitableRequestPublisher limitableRequestPublisher2;
        synchronized (this) {
            subscriber = (Subscriber) this.receivers.get(i);
        }
        if (subscriber == null) {
            handleMissingResponseProcessor(i, frameType, frame);
            return;
        }
        switch (frameType) {
            case ERROR:
                subscriber.onError(Exceptions.from(frame));
                removeReceiver(i);
                return;
            case LEASE:
            case KEEPALIVE:
            default:
                throw new IllegalStateException("Client received supported frame on stream " + i + ": " + frame.toString());
            case NEXT_COMPLETE:
                subscriber.onNext(new PayloadImpl(frame));
                subscriber.onComplete();
                return;
            case CANCEL:
                synchronized (this) {
                    limitableRequestPublisher2 = (LimitableRequestPublisher) this.senders.remove(i);
                    removeReceiver(i);
                }
                if (limitableRequestPublisher2 != null) {
                    limitableRequestPublisher2.cancel();
                    return;
                }
                return;
            case NEXT:
                subscriber.onNext(new PayloadImpl(frame));
                return;
            case REQUEST_N:
                synchronized (this) {
                    limitableRequestPublisher = (LimitableRequestPublisher) this.senders.get(i);
                }
                if (limitableRequestPublisher != null) {
                    limitableRequestPublisher.increaseRequestLimit(Frame.RequestN.requestN(frame));
                    return;
                }
                return;
            case COMPLETE:
                subscriber.onComplete();
                synchronized (this) {
                    this.receivers.remove(i);
                }
                return;
        }
    }

    private void handleMissingResponseProcessor(int i, FrameType frameType, Frame frame) {
        if (this.streamIdSupplier.isBeforeOrCurrent(i)) {
            return;
        }
        if (frameType != FrameType.ERROR) {
            throw new IllegalStateException("Client received message for non-existent stream: " + i + ", frame type: " + frameType);
        }
        throw new IllegalStateException("Client received error for non-existent stream: " + i + " Message: " + StandardCharsets.UTF_8.decode(frame.getData()).toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void removeReceiver(int i) {
        this.receivers.remove(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void removeSender(int i) {
        this.senders.remove(i);
    }
}
