package io.rsocket.internal;

import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.framing.FrameType;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.PluginRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer.class */
public class ClientServerInputMultiplexer implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
    private final DuplexConnection streamZeroConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;

    /* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer$InternalDuplexConnection.class */
    private static class InternalDuplexConnection implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<Frame>> processor;
        private final boolean debugEnabled = ClientServerInputMultiplexer.LOGGER.isDebugEnabled();

        public InternalDuplexConnection(DuplexConnection duplexConnection, MonoProcessor<Flux<Frame>> monoProcessor) {
            this.source = duplexConnection;
            this.processor = monoProcessor;
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> send(Publisher<Frame> publisher) {
            if (this.debugEnabled) {
                publisher = Flux.from(publisher).doOnNext(frame -> {
                    ClientServerInputMultiplexer.LOGGER.debug("sending -> " + frame.toString());
                });
            }
            return this.source.send(publisher);
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> sendOne(Frame frame) {
            if (this.debugEnabled) {
                ClientServerInputMultiplexer.LOGGER.debug("sending -> " + frame.toString());
            }
            return this.source.sendOne(frame);
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<Frame> receive() {
            return this.processor.flatMapMany(flux -> {
                return this.debugEnabled ? flux.doOnNext(frame -> {
                    ClientServerInputMultiplexer.LOGGER.debug("receiving -> " + frame.toString());
                }) : flux;
            });
        }

        public void dispose() {
            this.source.dispose();
        }

        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
        public double availability() {
            return this.source.availability();
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection, PluginRegistry pluginRegistry) {
        this.source = duplexConnection;
        MonoProcessor create = MonoProcessor.create();
        MonoProcessor create2 = MonoProcessor.create();
        MonoProcessor create3 = MonoProcessor.create();
        DuplexConnection applyConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection);
        this.streamZeroConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.STREAM_ZERO, new InternalDuplexConnection(applyConnection, create));
        this.serverConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(applyConnection, create2));
        this.clientConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(applyConnection, create3));
        applyConnection.receive().groupBy(frame -> {
            int streamId = frame.getStreamId();
            return streamId == 0 ? frame.getType() == FrameType.SETUP ? DuplexConnectionInterceptor.Type.STREAM_ZERO : DuplexConnectionInterceptor.Type.CLIENT : (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
        }).subscribe(groupedFlux -> {
            switch ((DuplexConnectionInterceptor.Type) groupedFlux.key()) {
                case STREAM_ZERO:
                    create.onNext(groupedFlux);
                    return;
                case SERVER:
                    create2.onNext(groupedFlux);
                    return;
                case CLIENT:
                    create3.onNext(groupedFlux);
                    return;
                default:
                    return;
            }
        }, th -> {
            LOGGER.error("Error receiving frame:", th);
            dispose();
        });
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asStreamZeroConnection() {
        return this.streamZeroConnection;
    }

    public void dispose() {
        this.source.dispose();
    }

    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }
}
