package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.PluginRegistry;
import io.rsocket.resume.ResumePositionsConnection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer.class */
public class ClientServerInputMultiplexer implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
    private static final PluginRegistry emptyPluginRegistry = new PluginRegistry();
    private final DuplexConnection setupConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;
    private final ResumePositionsConnection clientServerConnection;

    /* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer$InternalDuplexConnection.class */
    private static class InternalDuplexConnection implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<ByteBuf>>[] processors;
        private final boolean debugEnabled = ClientServerInputMultiplexer.LOGGER.isDebugEnabled();

        public InternalDuplexConnection(DuplexConnection duplexConnection, MonoProcessor<Flux<ByteBuf>>... monoProcessorArr) {
            this.source = duplexConnection;
            this.processors = monoProcessorArr;
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> send(Publisher<ByteBuf> publisher) {
            if (this.debugEnabled) {
                publisher = Flux.from(publisher).doOnNext(byteBuf -> {
                    ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString(byteBuf));
                });
            }
            return this.source.send(publisher);
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> sendOne(ByteBuf byteBuf) {
            if (this.debugEnabled) {
                ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString(byteBuf));
            }
            return this.source.sendOne(byteBuf);
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<ByteBuf> receive() {
            return Flux.fromArray(this.processors).flatMap(monoProcessor -> {
                return monoProcessor.flatMapMany(flux -> {
                    return this.debugEnabled ? flux.doOnNext(byteBuf -> {
                        ClientServerInputMultiplexer.LOGGER.debug("receiving -> " + FrameUtil.toString(byteBuf));
                    }) : flux;
                });
            });
        }

        public void dispose() {
            this.source.dispose();
        }

        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
        public double availability() {
            return this.source.availability();
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection) {
        this(duplexConnection, emptyPluginRegistry);
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection, PluginRegistry pluginRegistry) {
        this.source = duplexConnection;
        MonoProcessor create = MonoProcessor.create();
        MonoProcessor create2 = MonoProcessor.create();
        MonoProcessor create3 = MonoProcessor.create();
        DuplexConnection applyConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection);
        this.setupConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.SETUP, new InternalDuplexConnection(applyConnection, create));
        this.serverConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(applyConnection, create2));
        this.clientConnection = pluginRegistry.applyConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(applyConnection, create3));
        this.clientServerConnection = new ClientServerConnection(new InternalDuplexConnection(applyConnection, create3, create2), applyConnection);
        applyConnection.receive().groupBy(byteBuf -> {
            int streamId = FrameHeaderFlyweight.streamId(byteBuf);
            return streamId == 0 ? isSetup(byteBuf) ? DuplexConnectionInterceptor.Type.SETUP : DuplexConnectionInterceptor.Type.CLIENT : (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
        }).subscribe(groupedFlux -> {
            switch ((DuplexConnectionInterceptor.Type) groupedFlux.key()) {
                case SETUP:
                    create.onNext(groupedFlux);
                    return;
                case SERVER:
                    create2.onNext(groupedFlux);
                    return;
                case CLIENT:
                    create3.onNext(groupedFlux);
                    return;
                default:
                    return;
            }
        }, th -> {
            LOGGER.error("Error receiving frame:", th);
            dispose();
        });
    }

    public ResumePositionsConnection asClientServerConnection() {
        return this.clientServerConnection;
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asSetupConnection() {
        return this.setupConnection;
    }

    public void dispose() {
        this.source.dispose();
    }

    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    private static boolean isSetup(ByteBuf byteBuf) {
        switch (FrameHeaderFlyweight.frameType(byteBuf)) {
            case SETUP:
            case RESUME:
            case RESUME_OK:
                return true;
            default:
                return false;
        }
    }
}
