package org.reactivecommons.async.impl.communications;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;

/* loaded from: input_file:org/reactivecommons/async/impl/communications/TopologyCreator.class */
public class TopologyCreator {

    @Generated
    private static final Logger log = Logger.getLogger(TopologyCreator.class.getName());
    private final Mono<Channel> channel;

    /* loaded from: input_file:org/reactivecommons/async/impl/communications/TopologyCreator$TopologyDefException.class */
    public static class TopologyDefException extends RuntimeException {
        public TopologyDefException(String str, Throwable th) {
            super(str, th);
        }
    }

    public TopologyCreator(Mono<Connection> mono) {
        this.channel = mono.map(connection -> {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new TopologyDefException("Fail to create channel", e);
            }
        }).doOnError(th -> {
            log.log(Level.SEVERE, th.getMessage(), th);
        }).retryBackoff(5L, Duration.ofMillis(500L)).cache();
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchangeSpecification) {
        return this.channel.map(channel -> {
            try {
                return channel.exchangeDeclare(exchangeSpecification.getName(), exchangeSpecification.getType(), exchangeSpecification.isDurable(), exchangeSpecification.isAutoDelete(), exchangeSpecification.getArguments());
            } catch (IOException e) {
                throw new TopologyDefException("Fail to declare exchange: " + exchangeSpecification.getName(), e);
            }
        });
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queueSpecification) {
        return this.channel.map(channel -> {
            try {
                return channel.queueDeclare(queueSpecification.getName(), queueSpecification.isDurable(), queueSpecification.isExclusive(), queueSpecification.isAutoDelete(), queueSpecification.getArguments());
            } catch (IOException e) {
                throw new TopologyDefException("Fail to declare queue: " + queueSpecification.getName(), e);
            }
        });
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification bindingSpecification) {
        return this.channel.map(channel -> {
            try {
                return channel.queueBind(bindingSpecification.getQueue(), bindingSpecification.getExchange(), bindingSpecification.getRoutingKey(), bindingSpecification.getArguments());
            } catch (IOException e) {
                throw new TopologyDefException("Fail to bind queue: " + bindingSpecification.getQueue(), e);
            }
        });
    }
}
