package io.rsocket.routing.broker.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.routing.broker.locator.RSocketLocator;
import io.rsocket.routing.common.Tags;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/routing/broker/rsocket/RoutingRSocket.class */
public class RoutingRSocket implements RSocket {
    private final RSocketLocator rSocketLocator;
    private final Function<Payload, Tags> tagsExtractor;

    public RoutingRSocket(RSocketLocator rSocketLocator, Function<Payload, Tags> function) {
        this.rSocketLocator = rSocketLocator;
        this.tagsExtractor = function;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.rSocketLocator.apply(this.tagsExtractor.apply(payload)).flatMap(rSocket -> {
                return rSocket.fireAndForget(payload).onErrorResume(Mono::error);
            });
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.rSocketLocator.apply(this.tagsExtractor.apply(payload)).flatMap(rSocket -> {
                return rSocket.requestResponse(payload).onErrorResume(Mono::error);
            });
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.rSocketLocator.apply(this.tagsExtractor.apply(payload)).flatMap(rSocket -> {
                return rSocket.metadataPush(payload).onErrorResume(Mono::error);
            });
        } catch (Throwable th) {
            payload.release();
            return Mono.error(th);
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.rSocketLocator.apply(this.tagsExtractor.apply(payload)).flatMapMany(rSocket -> {
                return rSocket.requestStream(payload).onErrorResume(Mono::error);
            });
        } catch (Throwable th) {
            payload.release();
            return Flux.error(th);
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            if (!signal.hasValue()) {
                return flux;
            }
            Payload payload = (Payload) signal.get();
            payload.retain();
            try {
                return this.rSocketLocator.apply(this.tagsExtractor.apply(payload)).flatMapMany(rSocket -> {
                    return rSocket.requestChannel(flux.skip(1L).startWith(new Payload[]{payload})).onErrorResume(Mono::error);
                });
            } catch (Throwable th) {
                payload.release();
                return Flux.error(th);
            }
        });
    }
}
