/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.impl.communications;

import com.rabbitmq.client.AMQP;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.exceptions.SendFailureNoAckException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.Sender;

public class ReactiveMessageSender {
    private final Sender sender;
    private final String sourceApplication;
    private final MessageConverter messageConverter;
    private final TopologyCreator topologyCreator;
    private final int numberOfSenderSubscriptions = 4;
    private final CopyOnWriteArrayList<FluxSink<MyOutboundMessage>> fluxSinkConfirm = new CopyOnWriteArrayList();
    private volatile FluxSink<OutboundMessage> fluxSinkNoConfirm;
    private final AtomicLong counter = new AtomicLong();
    private final ExecutorService executorService = Executors.newFixedThreadPool(13, r -> new Thread(r, "RMessageSender1-" + this.counter.getAndIncrement()));
    private final ExecutorService executorService2 = Executors.newFixedThreadPool(13, r -> new Thread(r, "RMessageSender2-" + this.counter.getAndIncrement()));

    public ReactiveMessageSender(Sender sender, String sourceApplication, MessageConverter messageConverter, TopologyCreator topologyCreator) {
        this.sender = sender;
        this.sourceApplication = sourceApplication;
        this.messageConverter = messageConverter;
        this.topologyCreator = topologyCreator;
        for (int i = 0; i < 4; ++i) {
            Flux messageSource = Flux.create(this.fluxSinkConfirm::add);
            sender.sendWithTypedPublishConfirms((Publisher)messageSource).doOnNext(outboundMessageResult -> {
                Consumer<Boolean> ackNotifier = ((MyOutboundMessage)outboundMessageResult.getOutboundMessage()).getAckNotifier();
                this.executorService.submit(() -> ackNotifier.accept(outboundMessageResult.isAck()));
            }).subscribe();
        }
        Flux messageSourceNoConfirm = Flux.create(fluxSink -> {
            this.fluxSinkNoConfirm = fluxSink;
        });
        sender.send((Publisher)messageSourceNoConfirm).subscribe();
    }

    public <T> Mono<Void> sendWithConfirm(T message, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
        return Mono.create(monoSink -> {
            AckNotifier notifier = new AckNotifier((MonoSink<Void>)monoSink);
            MyOutboundMessage outboundMessage = this.toOutboundMessage(message, exchange, routingKey, headers, notifier, persistent);
            this.executorService2.submit(() -> this.fluxSinkConfirm.get((int)(System.currentTimeMillis() % 4L)).next((Object)outboundMessage));
        });
    }

    public <T> Mono<Void> sendNoConfirm(T message, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
        this.fluxSinkNoConfirm.next((Object)this.toOutboundMessage(message, exchange, routingKey, headers, persistent));
        return Mono.empty();
    }

    public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> messages, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
        return ((Flux)messages.map(message -> this.toOutboundMessage(message, exchange, routingKey, headers, persistent)).as(arg_0 -> ((Sender)this.sender).sendWithPublishConfirms(arg_0))).flatMap(result -> result.isAck() ? Mono.empty() : Mono.error((Throwable)new SendFailureNoAckException("Event no ACK in communications")));
    }

    private <T> MyOutboundMessage toOutboundMessage(T object, String exchange, String routingKey, Map<String, Object> headers, Consumer<Boolean> ackNotifier, boolean persistent) {
        Message message = this.messageConverter.toMessage(object);
        AMQP.BasicProperties props = this.buildMessageProperties(message, headers, persistent);
        return new MyOutboundMessage(exchange, routingKey, props, message.getBody(), ackNotifier);
    }

    private <T> OutboundMessage toOutboundMessage(T object, String exchange, String routingKey, Map<String, Object> headers, boolean persistent) {
        Message message = this.messageConverter.toMessage(object);
        AMQP.BasicProperties props = this.buildMessageProperties(message, headers, persistent);
        return new OutboundMessage(exchange, routingKey, props, message.getBody());
    }

    private AMQP.BasicProperties buildMessageProperties(Message message, Map<String, Object> headers, boolean persistent) {
        Message.Properties properties = message.getProperties();
        HashMap<String, Object> baseHeaders = new HashMap<String, Object>(properties.getHeaders());
        baseHeaders.putAll(headers);
        baseHeaders.put("sourceApplication", this.sourceApplication);
        return new AMQP.BasicProperties.Builder().contentType(properties.getContentType()).appId(this.sourceApplication).contentEncoding(properties.getContentEncoding()).deliveryMode(Integer.valueOf(persistent ? 2 : 1)).timestamp(new Date()).messageId(UUID.randomUUID().toString()).headers(baseHeaders).build();
    }

    public Sender getSender() {
        return this.sender;
    }

    public TopologyCreator getTopologyCreator() {
        return this.topologyCreator;
    }

    static class MyOutboundMessage
    extends OutboundMessage {
        private final Consumer<Boolean> ackNotifier;

        public MyOutboundMessage(String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body, Consumer<Boolean> ackNotifier) {
            super(exchange, routingKey, properties, body);
            this.ackNotifier = ackNotifier;
        }

        public Consumer<Boolean> getAckNotifier() {
            return this.ackNotifier;
        }
    }

    private static class AckNotifier
    implements Consumer<Boolean> {
        private final MonoSink<Void> monoSink;

        public AckNotifier(MonoSink<Void> monoSink) {
            this.monoSink = monoSink;
        }

        @Override
        public void accept(Boolean ack) {
            if (ack.booleanValue()) {
                this.monoSink.success();
            } else {
                this.monoSink.error((Throwable)new SendFailureNoAckException("No ACK when sending message"));
            }
        }
    }
}

