package org.reactivecommons.async.rabbit.communications;

import com.rabbitmq.client.AMQP;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.exceptions.SendFailureNoAckException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.Sender;

/* loaded from: input_file:org/reactivecommons/async/rabbit/communications/ReactiveMessageSender.class */
public class ReactiveMessageSender {
    private final Sender sender;
    private final String sourceApplication;
    private final MessageConverter messageConverter;
    private final TopologyCreator topologyCreator;
    private volatile FluxSink<OutboundMessage> fluxSinkNoConfirm;
    private final int numberOfSenderSubscriptions = 4;
    private final CopyOnWriteArrayList<FluxSink<MyOutboundMessage>> fluxSinkConfirm = new CopyOnWriteArrayList<>();
    private final AtomicLong counter = new AtomicLong();
    private final ExecutorService executorService = Executors.newFixedThreadPool(13, runnable -> {
        return new Thread(runnable, "RMessageSender1-" + this.counter.getAndIncrement());
    });
    private final ExecutorService executorService2 = Executors.newFixedThreadPool(13, runnable -> {
        return new Thread(runnable, "RMessageSender2-" + this.counter.getAndIncrement());
    });

    /* loaded from: input_file:org/reactivecommons/async/rabbit/communications/ReactiveMessageSender$AckNotifier.class */
    private static class AckNotifier implements Consumer<Boolean> {
        private final MonoSink<Void> monoSink;

        public AckNotifier(MonoSink<Void> monoSink) {
            this.monoSink = monoSink;
        }

        @Override // java.util.function.Consumer
        public void accept(Boolean bool) {
            if (bool.booleanValue()) {
                this.monoSink.success();
            } else {
                this.monoSink.error(new SendFailureNoAckException("No ACK when sending message"));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reactivecommons/async/rabbit/communications/ReactiveMessageSender$MyOutboundMessage.class */
    public static class MyOutboundMessage extends OutboundMessage {
        private final Consumer<Boolean> ackNotifier;

        public MyOutboundMessage(String str, String str2, AMQP.BasicProperties basicProperties, byte[] bArr, Consumer<Boolean> consumer) {
            super(str, str2, basicProperties, bArr);
            this.ackNotifier = consumer;
        }

        public Consumer<Boolean> getAckNotifier() {
            return this.ackNotifier;
        }
    }

    public ReactiveMessageSender(Sender sender, String str, MessageConverter messageConverter, TopologyCreator topologyCreator) {
        this.sender = sender;
        this.sourceApplication = str;
        this.messageConverter = messageConverter;
        this.topologyCreator = topologyCreator;
        for (int i = 0; i < 4; i++) {
            CopyOnWriteArrayList<FluxSink<MyOutboundMessage>> copyOnWriteArrayList = this.fluxSinkConfirm;
            copyOnWriteArrayList.getClass();
            sender.sendWithTypedPublishConfirms(Flux.create((v1) -> {
                r0.add(v1);
            })).doOnNext(outboundMessageResult -> {
                Consumer<Boolean> ackNotifier = ((MyOutboundMessage) outboundMessageResult.getOutboundMessage()).getAckNotifier();
                this.executorService.submit(() -> {
                    ackNotifier.accept(Boolean.valueOf(outboundMessageResult.isAck()));
                });
            }).subscribe();
        }
        sender.send(Flux.create(fluxSink -> {
            this.fluxSinkNoConfirm = fluxSink;
        })).subscribe();
    }

    public <T> Mono<Void> sendWithConfirm(T t, String str, String str2, Map<String, Object> map, boolean z) {
        return Mono.create(monoSink -> {
            MyOutboundMessage outboundMessage = toOutboundMessage(t, str, str2, map, new AckNotifier(monoSink), z);
            this.executorService2.submit(() -> {
                return this.fluxSinkConfirm.get((int) (System.currentTimeMillis() % 4)).next(outboundMessage);
            });
        });
    }

    public <T> Mono<Void> sendNoConfirm(T t, String str, String str2, Map<String, Object> map, boolean z) {
        this.fluxSinkNoConfirm.next(toOutboundMessage(t, str, str2, map, z));
        return Mono.empty();
    }

    public <T> Flux<OutboundMessageResult> sendWithConfirmBatch(Flux<T> flux, String str, String str2, Map<String, Object> map, boolean z) {
        Flux map2 = flux.map(obj -> {
            return toOutboundMessage(obj, str, str2, map, z);
        });
        Sender sender = this.sender;
        sender.getClass();
        return ((Flux) map2.as((v1) -> {
            return r1.sendWithPublishConfirms(v1);
        })).flatMap(outboundMessageResult -> {
            return outboundMessageResult.isAck() ? Mono.empty() : Mono.error(new SendFailureNoAckException("Event no ACK in communications"));
        });
    }

    private <T> MyOutboundMessage toOutboundMessage(T t, String str, String str2, Map<String, Object> map, Consumer<Boolean> consumer, boolean z) {
        Message message = this.messageConverter.toMessage(t);
        return new MyOutboundMessage(str, str2, buildMessageProperties(message, map, z), message.getBody(), consumer);
    }

    private <T> OutboundMessage toOutboundMessage(T t, String str, String str2, Map<String, Object> map, boolean z) {
        Message message = this.messageConverter.toMessage(t);
        return new OutboundMessage(str, str2, buildMessageProperties(message, map, z), message.getBody());
    }

    private AMQP.BasicProperties buildMessageProperties(Message message, Map<String, Object> map, boolean z) {
        Message.Properties properties = message.getProperties();
        HashMap hashMap = new HashMap(properties.getHeaders());
        hashMap.putAll(map);
        hashMap.put("sourceApplication", this.sourceApplication);
        return new AMQP.BasicProperties.Builder().contentType(properties.getContentType()).appId(this.sourceApplication).contentEncoding(properties.getContentEncoding()).deliveryMode(Integer.valueOf(z ? 2 : 1)).timestamp(new Date()).messageId(UUID.randomUUID().toString()).headers(hashMap).build();
    }

    public Sender getSender() {
        return this.sender;
    }

    public TopologyCreator getTopologyCreator() {
        return this.topologyCreator;
    }
}
