package io.reactivesocket.internal;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.FrameType;
import io.reactivesocket.Plugins;
import io.reactivesocket.util.BitUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:io/reactivesocket/internal/ClientServerInputMultiplexer.class */
public class ClientServerInputMultiplexer {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.reactivesocket.FrameLogger");
    private final DuplexConnection streamZeroConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;

    /* loaded from: input_file:io/reactivesocket/internal/ClientServerInputMultiplexer$InternalDuplexConnection.class */
    private static class InternalDuplexConnection implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<Frame>> processor;
        private final boolean debugEnabled = ClientServerInputMultiplexer.LOGGER.isDebugEnabled();

        public InternalDuplexConnection(DuplexConnection duplexConnection, MonoProcessor<Flux<Frame>> monoProcessor) {
            this.source = duplexConnection;
            this.processor = monoProcessor;
        }

        @Override // io.reactivesocket.DuplexConnection
        public Mono<Void> send(Publisher<Frame> publisher) {
            if (this.debugEnabled) {
                publisher = Flux.from(publisher).doOnNext(frame -> {
                    ClientServerInputMultiplexer.LOGGER.debug("sending -> " + frame.toString());
                });
            }
            return this.source.send(publisher);
        }

        @Override // io.reactivesocket.DuplexConnection
        public Mono<Void> sendOne(Frame frame) {
            if (this.debugEnabled) {
                ClientServerInputMultiplexer.LOGGER.debug("sending -> " + frame.toString());
            }
            return this.source.sendOne(frame);
        }

        @Override // io.reactivesocket.DuplexConnection
        public Flux<Frame> receive() {
            return this.processor.flatMap(flux -> {
                return this.debugEnabled ? flux.doOnNext(frame -> {
                    ClientServerInputMultiplexer.LOGGER.debug("receiving -> " + frame.toString());
                }) : flux;
            });
        }

        @Override // io.reactivesocket.DuplexConnection
        public Mono<Void> close() {
            return this.source.close();
        }

        @Override // io.reactivesocket.DuplexConnection
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.reactivesocket.Availability
        public double availability() {
            return this.source.availability();
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection) {
        MonoProcessor create = MonoProcessor.create();
        MonoProcessor create2 = MonoProcessor.create();
        MonoProcessor create3 = MonoProcessor.create();
        this.streamZeroConnection = Plugins.DUPLEX_CONNECTION_INTERCEPTOR.apply(Plugins.DuplexConnectionInterceptor.Type.STREAM_ZERO, new InternalDuplexConnection(duplexConnection, create));
        this.serverConnection = Plugins.DUPLEX_CONNECTION_INTERCEPTOR.apply(Plugins.DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(duplexConnection, create2));
        this.clientConnection = Plugins.DUPLEX_CONNECTION_INTERCEPTOR.apply(Plugins.DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(duplexConnection, create3));
        duplexConnection.receive().groupBy(frame -> {
            int streamId = frame.getStreamId();
            return streamId == 0 ? frame.getType() == FrameType.SETUP ? Plugins.DuplexConnectionInterceptor.Type.STREAM_ZERO : Plugins.DuplexConnectionInterceptor.Type.CLIENT : BitUtil.isEven(streamId) ? Plugins.DuplexConnectionInterceptor.Type.SERVER : Plugins.DuplexConnectionInterceptor.Type.CLIENT;
        }).subscribe(groupedFlux -> {
            switch ((Plugins.DuplexConnectionInterceptor.Type) groupedFlux.key()) {
                case STREAM_ZERO:
                    create.onNext(groupedFlux);
                    return;
                case SERVER:
                    create2.onNext(groupedFlux);
                    return;
                case CLIENT:
                    create3.onNext(groupedFlux);
                    return;
                default:
                    return;
            }
        });
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asStreamZeroConnection() {
        return this.streamZeroConnection;
    }
}
