package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

@Deprecated
/* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer.class */
public class ClientServerInputMultiplexer implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
    private static final InitializingInterceptorRegistry emptyInterceptorRegistry = new InitializingInterceptorRegistry();
    private final DuplexConnection setupConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection clientConnection;
    private final DuplexConnection source;
    private final DuplexConnection clientServerConnection;
    private boolean setupReceived;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.rsocket.internal.ClientServerInputMultiplexer$1, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type;
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$frame$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.SETUP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.RESUME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.RESUME_OK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.LEASE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.KEEPALIVE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$frame$FrameType[FrameType.ERROR.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type = new int[DuplexConnectionInterceptor.Type.values().length];
            try {
                $SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type[DuplexConnectionInterceptor.Type.SETUP.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type[DuplexConnectionInterceptor.Type.SERVER.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type[DuplexConnectionInterceptor.Type.CLIENT.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* loaded from: input_file:io/rsocket/internal/ClientServerInputMultiplexer$InternalDuplexConnection.class */
    private static class InternalDuplexConnection implements DuplexConnection {
        private final DuplexConnection source;
        private final MonoProcessor<Flux<ByteBuf>>[] processors;
        private final boolean debugEnabled = ClientServerInputMultiplexer.LOGGER.isDebugEnabled();

        @SafeVarargs
        public InternalDuplexConnection(DuplexConnection duplexConnection, MonoProcessor<Flux<ByteBuf>>... monoProcessorArr) {
            this.source = duplexConnection;
            this.processors = monoProcessorArr;
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> send(Publisher<ByteBuf> publisher) {
            if (this.debugEnabled) {
                publisher = Flux.from(publisher).doOnNext(byteBuf -> {
                    ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString(byteBuf));
                });
            }
            return this.source.send(publisher);
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> sendOne(ByteBuf byteBuf) {
            if (this.debugEnabled) {
                ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString(byteBuf));
            }
            return this.source.sendOne(byteBuf);
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<ByteBuf> receive() {
            return Flux.fromArray(this.processors).flatMap(monoProcessor -> {
                return monoProcessor.flatMapMany(flux -> {
                    return this.debugEnabled ? flux.doOnNext(byteBuf -> {
                        ClientServerInputMultiplexer.LOGGER.debug("receiving -> " + FrameUtil.toString(byteBuf));
                    }) : flux;
                });
            });
        }

        @Override // io.rsocket.DuplexConnection
        public ByteBufAllocator alloc() {
            return this.source.alloc();
        }

        public void dispose() {
            this.source.dispose();
        }

        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
        public double availability() {
            return this.source.availability();
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection) {
        this(duplexConnection, emptyInterceptorRegistry, false);
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection, InitializingInterceptorRegistry initializingInterceptorRegistry, boolean z) {
        this.source = duplexConnection;
        MonoProcessor create = MonoProcessor.create();
        MonoProcessor create2 = MonoProcessor.create();
        MonoProcessor create3 = MonoProcessor.create();
        DuplexConnection initConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection);
        this.setupConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SETUP, new InternalDuplexConnection(initConnection, create));
        this.serverConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(initConnection, create2));
        this.clientConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(initConnection, create3));
        this.clientServerConnection = new InternalDuplexConnection(initConnection, create3, create2);
        initConnection.receive().groupBy(byteBuf -> {
            DuplexConnectionInterceptor.Type type;
            int streamId = FrameHeaderCodec.streamId(byteBuf);
            if (streamId == 0) {
                switch (AnonymousClass1.$SwitchMap$io$rsocket$frame$FrameType[FrameHeaderCodec.frameType(byteBuf).ordinal()]) {
                    case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                    case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                    case 3:
                        type = DuplexConnectionInterceptor.Type.SETUP;
                        this.setupReceived = true;
                        break;
                    case ErrorFrameCodec.REJECTED_RESUME /* 4 */:
                    case 5:
                    case 6:
                        type = z ? DuplexConnectionInterceptor.Type.CLIENT : DuplexConnectionInterceptor.Type.SERVER;
                        break;
                    default:
                        type = z ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
                        break;
                }
            } else {
                type = (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
            }
            if (z || type == DuplexConnectionInterceptor.Type.SETUP || this.setupReceived) {
                return type;
            }
            byteBuf.release();
            throw new IllegalStateException("SETUP or LEASE frame must be received before any others.");
        }).subscribe(groupedFlux -> {
            switch (AnonymousClass1.$SwitchMap$io$rsocket$plugins$DuplexConnectionInterceptor$Type[((DuplexConnectionInterceptor.Type) groupedFlux.key()).ordinal()]) {
                case ErrorFrameCodec.INVALID_SETUP /* 1 */:
                    create.onNext(groupedFlux);
                    return;
                case ErrorFrameCodec.UNSUPPORTED_SETUP /* 2 */:
                    create2.onNext(groupedFlux);
                    return;
                case 3:
                    create3.onNext(groupedFlux);
                    return;
                default:
                    return;
            }
        }, th -> {
            create.onError(th);
            create2.onError(th);
            create3.onError(th);
        });
    }

    public DuplexConnection asClientServerConnection() {
        return this.clientServerConnection;
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asSetupConnection() {
        return this.setupConnection;
    }

    public void dispose() {
        this.source.dispose();
    }

    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }
}
