/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.impl;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
import org.reactivecommons.async.impl.config.BrokerConfig;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.reply.ReactiveReplyRouter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessageResult;

public class RabbitDirectAsyncGateway
implements DirectAsyncGateway {
    private final BrokerConfig config;
    private final ReactiveReplyRouter router;
    private final ReactiveMessageSender sender;
    private final String exchange;
    private final MessageConverter converter;
    private final boolean persistentCommands;
    private final boolean persistentQueries;
    private final Duration replyTimeout;

    public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ReactiveMessageSender sender, String exchange, MessageConverter converter) {
        this.config = config;
        this.router = router;
        this.sender = sender;
        this.exchange = exchange;
        this.converter = converter;
        this.persistentCommands = config.isPersistentCommands();
        this.persistentQueries = config.isPersistentQueries();
        this.replyTimeout = config.getReplyTimeout();
    }

    public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
        return this.sender.sendWithConfirm(command, this.exchange, targetName, Collections.emptyMap(), this.persistentCommands);
    }

    public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
        return this.sender.sendWithConfirmBatch(commands, this.exchange, targetName, Collections.emptyMap(), this.persistentCommands);
    }

    public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type) {
        String correlationID = UUID.randomUUID().toString().replaceAll("-", "");
        Mono replyHolder = this.router.register(correlationID).timeout(this.replyTimeout).flatMap(s -> Mono.fromCallable(() -> this.converter.readValue((Message)s, type)));
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("x-reply_id", this.config.getRoutingKey());
        headers.put("x-serveQuery-id", query.getResource());
        headers.put("x-correlation-id", correlationID);
        return this.sender.sendNoConfirm(query, this.exchange, targetName + ".query", headers, this.persistentQueries).then(replyHolder);
    }
}

