package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.api.From;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import reactor.core.observability.micrometer.Micrometer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessageResult;

/* loaded from: input_file:org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.class */
public class RabbitDirectAsyncGateway implements DirectAsyncGateway {
    private final BrokerConfig config;
    private final ReactiveReplyRouter router;
    private final ReactiveMessageSender sender;
    private final String exchange;
    private final MessageConverter converter;
    private final boolean persistentCommands;
    private final boolean persistentQueries;
    private final Duration replyTimeout;
    private final MeterRegistry meterRegistry;

    public RabbitDirectAsyncGateway(BrokerConfig brokerConfig, ReactiveReplyRouter reactiveReplyRouter, ReactiveMessageSender reactiveMessageSender, String str, MessageConverter messageConverter, MeterRegistry meterRegistry) {
        this.config = brokerConfig;
        this.router = reactiveReplyRouter;
        this.sender = reactiveMessageSender;
        this.exchange = str;
        this.converter = messageConverter;
        this.persistentCommands = brokerConfig.isPersistentCommands();
        this.persistentQueries = brokerConfig.isPersistentQueries();
        this.replyTimeout = brokerConfig.getReplyTimeout();
        this.meterRegistry = meterRegistry;
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String str) {
        return sendCommand(command, str, "app");
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String str, String str2) {
        return resolveSender(str2).sendWithConfirm(command, this.exchange, str, Collections.emptyMap(), this.persistentCommands);
    }

    public Mono<Void> sendCommand(CloudEvent cloudEvent, String str) {
        return sendCommand(new Command(cloudEvent.getType(), cloudEvent.getId(), cloudEvent), str);
    }

    public Mono<Void> sendCommand(CloudEvent cloudEvent, String str, String str2) {
        return sendCommand(new Command(cloudEvent.getType(), cloudEvent.getId(), cloudEvent), str, str2);
    }

    public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> flux, String str) {
        return this.sender.sendWithConfirmBatch(flux, this.exchange, str, Collections.emptyMap(), this.persistentCommands);
    }

    public <T, R> Mono<R> requestReply(AsyncQuery<T> asyncQuery, String str, Class<R> cls) {
        return requestReply(asyncQuery, str, cls, "app");
    }

    public <T, R> Mono<R> requestReply(AsyncQuery<T> asyncQuery, String str, Class<R> cls, String str2) {
        String replaceAll = UUID.randomUUID().toString().replaceAll("-", "");
        Mono flatMap = this.router.register(replaceAll).timeout(this.replyTimeout).doOnError(TimeoutException.class, timeoutException -> {
            this.router.deregister(replaceAll);
        }).flatMap(message -> {
            return Mono.fromCallable(() -> {
                return this.converter.readValue(message, cls);
            });
        });
        HashMap hashMap = new HashMap();
        hashMap.put("x-reply_id", this.config.getRoutingKey());
        hashMap.put("x-serveQuery-id", asyncQuery.getResource());
        hashMap.put("x-correlation-id", replaceAll);
        hashMap.put("x-reply-timeout-millis", Long.valueOf(this.replyTimeout.toMillis()));
        return resolveSender(str2).sendNoConfirm(asyncQuery, this.exchange, str + ".query", hashMap, this.persistentQueries).then(flatMap).name("async_query").tag("operation", asyncQuery.getResource()).tag("target", str).tap(Micrometer.metrics(this.meterRegistry));
    }

    public <R extends CloudEvent> Mono<R> requestReply(CloudEvent cloudEvent, String str, Class<R> cls) {
        return requestReply(new AsyncQuery(cloudEvent.getType(), cloudEvent), str, cls);
    }

    public <R extends CloudEvent> Mono<R> requestReply(CloudEvent cloudEvent, String str, Class<R> cls, String str2) {
        return requestReply(new AsyncQuery(cloudEvent.getType(), cloudEvent), str, cls, str2);
    }

    public <T> Mono<Void> reply(T t, From from) {
        HashMap hashMap = new HashMap();
        hashMap.put("x-correlation-id", from.getCorrelationID());
        if (t == null) {
            hashMap.put("x-empty-completion", Boolean.TRUE.toString());
        }
        return this.sender.sendNoConfirm(t, "globalReply", from.getReplyID(), hashMap, false);
    }

    protected ReactiveMessageSender resolveSender(String str) {
        return this.sender;
    }
}
