package reactor.rabbitmq.samples;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.RabbitFlux;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

@SpringBootApplication
/* loaded from: input_file:reactor/rabbitmq/samples/SpringBootSample.class */
public class SpringBootSample {
    static final String QUEUE = "reactor.rabbitmq.spring.boot";
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringBootSample.class);

    @Component
    /* loaded from: input_file:reactor/rabbitmq/samples/SpringBootSample$Runner.class */
    static class Runner implements CommandLineRunner {
        final Sender sender;
        final Flux<Delivery> deliveryFlux;
        final AtomicBoolean latchCompleted = new AtomicBoolean(false);

        Runner(Sender sender, Flux<Delivery> flux) {
            this.sender = sender;
            this.deliveryFlux = flux;
        }

        public void run(String... strArr) throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(10);
            this.deliveryFlux.subscribe(delivery -> {
                SpringBootSample.LOGGER.info("Received message {}", new String(delivery.getBody()));
                countDownLatch.countDown();
            });
            SpringBootSample.LOGGER.info("Sending messages...");
            this.sender.send(Flux.range(1, 10).map(num -> {
                return new OutboundMessage("", SpringBootSample.QUEUE, ("Message_" + num).getBytes());
            })).subscribe();
            this.latchCompleted.set(countDownLatch.await(5L, TimeUnit.SECONDS));
        }
    }

    public static void main(String[] strArr) {
        SpringApplication.run(SpringBootSample.class, strArr).close();
    }

    @Bean
    Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    Mono<Connection> connectionMono(ConnectionFactory connectionFactory) {
        return Mono.fromCallable(() -> {
            return connectionFactory.createConnection().getDelegate();
        }).cache();
    }

    @Bean
    Sender sender(Mono<Connection> mono) {
        return RabbitFlux.createSender(new SenderOptions().connectionMono(mono));
    }

    @Bean
    Receiver receiver(Mono<Connection> mono) {
        return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(mono));
    }

    @Bean
    Flux<Delivery> deliveryFlux(Receiver receiver) {
        return receiver.consumeNoAck(QUEUE);
    }
}
