package io.reactivesocket;

import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.reactivesocket.Frame;
import io.reactivesocket.exceptions.ApplicationException;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.LimitableRequestPublisher;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.util.PayloadImpl;
import java.util.Collection;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/reactivesocket/ServerReactiveSocket.class */
public class ServerReactiveSocket implements ReactiveSocket {
    private final DuplexConnection connection;
    private final Consumer<Throwable> errorConsumer;
    private final IntObjectHashMap<Subscription> sendingSubscriptions;
    private final IntObjectHashMap<UnicastProcessor<Payload>> channelProcessors;
    private final ReactiveSocket requestHandler;
    private volatile Disposable subscribe;

    public ServerReactiveSocket(DuplexConnection duplexConnection, ReactiveSocket reactiveSocket, boolean z, Consumer<Throwable> consumer) {
        this.requestHandler = reactiveSocket;
        this.connection = duplexConnection;
        this.errorConsumer = new KnownErrorFilter(consumer);
        this.sendingSubscriptions = new IntObjectHashMap<>();
        this.channelProcessors = new IntObjectHashMap<>();
        duplexConnection.onClose().doFinally(signalType -> {
            cleanup();
        }).subscribe();
        if (reactiveSocket instanceof LeaseEnforcingSocket) {
            ((LeaseEnforcingSocket) reactiveSocket).acceptLeaseSender(lease -> {
                if (z) {
                    duplexConnection.sendOne(Frame.Lease.from(lease.getTtl(), lease.getAllowedRequests(), lease.getMetadata() == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(lease.getMetadata()))).doOnError(consumer).subscribe();
                }
            });
        }
    }

    public ServerReactiveSocket(DuplexConnection duplexConnection, ReactiveSocket reactiveSocket, Consumer<Throwable> consumer) {
        this(duplexConnection, reactiveSocket, true, consumer);
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.requestHandler.requestChannel(publisher);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> close() {
        if (this.subscribe != null) {
            this.subscribe.dispose();
        }
        return this.connection.close();
    }

    @Override // io.reactivesocket.ReactiveSocket
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    public ServerReactiveSocket start() {
        this.subscribe = this.connection.receive().flatMap(frame -> {
            try {
                int streamId = frame.getStreamId();
                switch (frame.getType()) {
                    case FIRE_AND_FORGET:
                        Mono<Void> handleFireAndForget = handleFireAndForget(streamId, fireAndForget(new PayloadImpl(frame)));
                        frame.release();
                        return handleFireAndForget;
                    case REQUEST_RESPONSE:
                        Mono<Void> handleRequestResponse = handleRequestResponse(streamId, requestResponse(new PayloadImpl(frame)));
                        frame.release();
                        return handleRequestResponse;
                    case CANCEL:
                        Mono<Void> handleCancelFrame = handleCancelFrame(streamId);
                        frame.release();
                        return handleCancelFrame;
                    case KEEPALIVE:
                        Mono<Void> handleKeepAliveFrame = handleKeepAliveFrame(frame);
                        frame.release();
                        return handleKeepAliveFrame;
                    case REQUEST_N:
                        Mono<Void> handleRequestN = handleRequestN(streamId, frame);
                        frame.release();
                        return handleRequestN;
                    case REQUEST_STREAM:
                        Mono<Void> handleStream = handleStream(streamId, requestStream(new PayloadImpl(frame)), frame);
                        frame.release();
                        return handleStream;
                    case REQUEST_CHANNEL:
                        Mono<Void> handleChannel = handleChannel(streamId, frame);
                        frame.release();
                        return handleChannel;
                    case PAYLOAD:
                        Mono empty = Mono.empty();
                        frame.release();
                        return empty;
                    case METADATA_PUSH:
                        Mono<Void> metadataPush = metadataPush(new PayloadImpl(frame));
                        frame.release();
                        return metadataPush;
                    case LEASE:
                        Mono empty2 = Mono.empty();
                        frame.release();
                        return empty2;
                    case NEXT:
                        UnicastProcessor<Payload> channelProcessor = getChannelProcessor(streamId);
                        if (channelProcessor != null) {
                            channelProcessor.onNext(new PayloadImpl(frame));
                        }
                        Mono empty3 = Mono.empty();
                        frame.release();
                        return empty3;
                    case COMPLETE:
                        UnicastProcessor<Payload> channelProcessor2 = getChannelProcessor(streamId);
                        if (channelProcessor2 != null) {
                            channelProcessor2.onComplete();
                        }
                        Mono empty4 = Mono.empty();
                        frame.release();
                        return empty4;
                    case ERROR:
                        UnicastProcessor<Payload> channelProcessor3 = getChannelProcessor(streamId);
                        if (channelProcessor3 != null) {
                            channelProcessor3.onError(new ApplicationException(new PayloadImpl(frame)));
                        }
                        Mono empty5 = Mono.empty();
                        frame.release();
                        return empty5;
                    case NEXT_COMPLETE:
                        UnicastProcessor<Payload> channelProcessor4 = getChannelProcessor(streamId);
                        if (channelProcessor4 != null) {
                            channelProcessor4.onNext(new PayloadImpl(frame));
                            channelProcessor4.onComplete();
                        }
                        Mono empty6 = Mono.empty();
                        frame.release();
                        return empty6;
                    case SETUP:
                        Mono<Void> handleError = handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                        frame.release();
                        return handleError;
                    default:
                        Mono<Void> handleError2 = handleError(streamId, new IllegalStateException("ServerReactiveSocket: Unexpected frame type: " + frame.getType()));
                        frame.release();
                        return handleError2;
                }
            } catch (Throwable th) {
                frame.release();
                throw th;
            }
        }).doOnError(th -> {
            Collection values;
            this.errorConsumer.accept(th);
            synchronized (this) {
                values = this.sendingSubscriptions.values();
            }
            values.forEach((v0) -> {
                v0.cancel();
            });
        }).subscribe();
        return this;
    }

    private void cleanup() {
        if (this.subscribe != null) {
            this.subscribe.dispose();
        }
        cleanUpSendingSubscriptions();
        cleanUpChannelProcessors();
        this.requestHandler.close().subscribe();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach((v0) -> {
            v0.cancel();
        });
        this.channelProcessors.clear();
    }

    private Mono<Void> handleFireAndForget(int i, Mono<Void> mono) {
        return mono.doOnSubscribe(subscription -> {
            addSubscription(i, subscription);
        }).doOnError(this.errorConsumer).doFinally(signalType -> {
            removeSubscription(i);
        }).ignoreElement();
    }

    private Mono<Void> handleRequestResponse(int i, Mono<Payload> mono) {
        Mono doFinally = mono.doOnSubscribe(subscription -> {
            addSubscription(i, subscription);
        }).map(payload -> {
            return Frame.PayloadFrame.from(i, FrameType.NEXT_COMPLETE, payload, 64);
        }).doOnCancel(() -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Cancel.from(i)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doOnError(th -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Error.from(i, th)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doFinally(signalType -> {
            removeSubscription(i);
        });
        DuplexConnection duplexConnection = this.connection;
        duplexConnection.getClass();
        return doFinally.then(duplexConnection::sendOne);
    }

    private Mono<Void> handleStream(int i, Flux<Payload> flux, Frame frame) {
        int initialRequestN = Frame.Request.initialRequestN(frame);
        return this.connection.send(flux.map(payload -> {
            return Frame.PayloadFrame.from(i, FrameType.NEXT, payload);
        }).transform(flux2 -> {
            LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux2);
            synchronized (this) {
                wrap.increaseRequestLimit(initialRequestN);
                this.sendingSubscriptions.put(i, wrap);
            }
            return wrap;
        }).doOnCancel(() -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Cancel.from(i)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doOnError(th -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Error.from(i, th)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doOnComplete(() -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.PayloadFrame.from(i, FrameType.COMPLETE)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doFinally(signalType -> {
            removeSubscription(i);
        }));
    }

    private Mono<Void> handleChannel(int i, Frame frame) {
        UnicastProcessor<Payload> create = UnicastProcessor.create();
        addChannelProcessor(i, create);
        return handleStream(i, requestChannel(create.doOnCancel(() -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Cancel.from(i)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doOnError(th -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.Error.from(i, th)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doOnRequest(j -> {
            if (this.connection.availability() > 0.0d) {
                this.connection.sendOne(Frame.RequestN.from(i, j)).subscribe((Consumer) null, this.errorConsumer);
            }
        }).doFinally(signalType -> {
            removeChannelProcessor(i);
        })), frame);
    }

    private Mono<Void> handleKeepAliveFrame(Frame frame) {
        return Frame.Keepalive.hasRespondFlag(frame) ? this.connection.sendOne(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, false)).doOnError(this.errorConsumer) : Mono.empty();
    }

    private Mono<Void> handleCancelFrame(int i) {
        Subscription subscription;
        synchronized (this) {
            subscription = (Subscription) this.sendingSubscriptions.remove(i);
        }
        if (subscription != null) {
            subscription.cancel();
        }
        return Mono.empty();
    }

    private Mono<Void> handleError(int i, Throwable th) {
        this.errorConsumer.accept(th);
        return this.connection.sendOne(Frame.Error.from(i, th)).doOnError(this.errorConsumer);
    }

    private Mono<Void> handleRequestN(int i, Frame frame) {
        Subscription subscription = getSubscription(i);
        if (subscription != null) {
            int requestN = Frame.RequestN.requestN(frame);
            subscription.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
        }
        return Mono.empty();
    }

    private synchronized void addSubscription(int i, Subscription subscription) {
        this.sendingSubscriptions.put(i, subscription);
    }

    private synchronized Subscription getSubscription(int i) {
        return (Subscription) this.sendingSubscriptions.get(i);
    }

    private synchronized void removeSubscription(int i) {
        this.sendingSubscriptions.remove(i);
    }

    private synchronized void addChannelProcessor(int i, UnicastProcessor<Payload> unicastProcessor) {
        this.channelProcessors.put(i, unicastProcessor);
    }

    private synchronized UnicastProcessor<Payload> getChannelProcessor(int i) {
        return (UnicastProcessor) this.channelProcessors.get(i);
    }

    private synchronized void removeChannelProcessor(int i) {
        this.channelProcessors.remove(i);
    }
}
