package org.springframework.cloud.fn.consumer.rsocket;

import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.rsocket.RSocketRequester;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@EnableConfigurationProperties({RsocketConsumerProperties.class})
@AutoConfiguration(after = {RSocketRequesterAutoConfiguration.class})
/* loaded from: input_file:org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerConfiguration.class */
public class RsocketConsumerConfiguration {
    @Bean
    public Consumer<Flux<Message<?>>> rsocketConsumer(@Qualifier("rsocketFunctionConsumer") Function<Flux<Message<?>>, Mono<Void>> function) {
        return flux -> {
            ((Mono) function.apply(flux)).block();
        };
    }

    @Bean
    public Function<Flux<Message<?>>, Mono<Void>> rsocketFunctionConsumer(RSocketRequester.Builder builder, RsocketConsumerProperties rsocketConsumerProperties) {
        RSocketRequester websocket = rsocketConsumerProperties.getUri() != null ? builder.websocket(rsocketConsumerProperties.getUri()) : builder.tcp(rsocketConsumerProperties.getHost(), rsocketConsumerProperties.getPort());
        String route = rsocketConsumerProperties.getRoute();
        return flux -> {
            return flux.flatMap(message -> {
                return websocket.route(route, new Object[0]).data(message.getPayload()).send();
            }).ignoreElements();
        };
    }
}
