package org.iris_events.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.ReturnCallback;
import com.rabbitmq.client.ReturnListener;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.transaction.RollbackException;
import jakarta.transaction.Synchronization;
import jakarta.transaction.SystemException;
import jakarta.transaction.Transaction;
import jakarta.transaction.TransactionManager;
import jakarta.validation.constraints.NotNull;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.iris_events.annotations.CachedMessage;
import org.iris_events.annotations.ExchangeType;
import org.iris_events.annotations.Scope;
import org.iris_events.asyncapi.parsers.CacheableTtlParser;
import org.iris_events.asyncapi.parsers.ExchangeParser;
import org.iris_events.asyncapi.parsers.ExchangeTypeParser;
import org.iris_events.asyncapi.parsers.MessageScopeParser;
import org.iris_events.asyncapi.parsers.PersistentParser;
import org.iris_events.asyncapi.parsers.RoutingKeyParser;
import org.iris_events.common.Exchanges;
import org.iris_events.common.message.ResourceMessage;
import org.iris_events.context.EventContext;
import org.iris_events.exception.IrisSendException;
import org.iris_events.exception.IrisTransactionException;
import org.iris_events.producer.RoutingDetails;
import org.iris_events.runtime.BasicPropertiesProvider;
import org.iris_events.runtime.channel.ChannelKey;
import org.iris_events.runtime.channel.ChannelService;
import org.iris_events.runtime.configuration.IrisRabbitMQConfig;
import org.iris_events.tx.TransactionCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/iris_events/producer/EventProducer.class */
public class EventProducer {
    private static final Logger log = LoggerFactory.getLogger(EventProducer.class);
    public static final String SERVICE_ID_UNAVAILABLE_FALLBACK = "N/A";
    private static final long WAIT_TIMEOUT_MILLIS = 2000;
    private static final String RESOURCE = "resource";
    private final ChannelService channelService;
    private final ObjectMapper objectMapper;
    private final EventContext eventContext;
    private final IrisRabbitMQConfig config;
    private final TransactionManager transactionManager;
    private final BasicPropertiesProvider basicPropertiesProvider;
    private final AtomicInteger count = new AtomicInteger(0);
    private final Object lock = new Object();
    private final Map<Transaction, List<Message>> transactionDelayedMessages = new ConcurrentHashMap();
    private TransactionCallback transactionCallback;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.iris_events.producer.EventProducer$1, reason: invalid class name */
    /* loaded from: input_file:org/iris_events/producer/EventProducer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$iris_events$annotations$Scope = new int[Scope.values().length];

        static {
            try {
                $SwitchMap$org$iris_events$annotations$Scope[Scope.INTERNAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$iris_events$annotations$Scope[Scope.USER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$iris_events$annotations$Scope[Scope.SESSION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$iris_events$annotations$Scope[Scope.BROADCAST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/iris_events/producer/EventProducer$ProducerSynchronization.class */
    public class ProducerSynchronization implements Synchronization {
        private final Transaction tx;

        public ProducerSynchronization(Transaction transaction) {
            this.tx = transaction;
        }

        public void beforeCompletion() {
            if (EventProducer.this.transactionCallback != null) {
                EventProducer.this.transactionCallback.beforeCompletion(EventProducer.this.transactionDelayedMessages.get(this.tx));
            }
        }

        public void afterCompletion(int i) {
            boolean z = false;
            try {
                if (i == 3) {
                    try {
                        EventProducer.this.executeTxPublish(this.tx);
                        z = true;
                    } catch (IOException | IrisSendException e) {
                        EventProducer.log.error("Exception completing send transaction.", e);
                        throw new IrisTransactionException("Exception completing send transaction");
                    }
                }
            } finally {
                if (EventProducer.this.transactionCallback != null) {
                    EventProducer.this.transactionCallback.afterCompletion(EventProducer.this.transactionDelayedMessages.get(this.tx), i, z);
                }
                EventProducer.this.transactionDelayedMessages.remove(this.tx);
            }
        }
    }

    @Inject
    public EventProducer(@Named("producerChannelService") ChannelService channelService, ObjectMapper objectMapper, EventContext eventContext, IrisRabbitMQConfig irisRabbitMQConfig, TransactionManager transactionManager, BasicPropertiesProvider basicPropertiesProvider) {
        this.channelService = channelService;
        this.objectMapper = objectMapper;
        this.eventContext = eventContext;
        this.config = irisRabbitMQConfig;
        this.transactionManager = transactionManager;
        this.basicPropertiesProvider = basicPropertiesProvider;
    }

    public void send(Object obj) throws IrisSendException, IrisTransactionException {
        doSend(obj, null);
    }

    public void send(Object obj, String str) throws IrisSendException, IrisTransactionException {
        doSend(obj, str);
    }

    public void sendToSubscription(Object obj, String str, String str2) throws IrisSendException, IrisTransactionException {
        org.iris_events.annotations.Message messageAnnotation = getMessageAnnotation(obj);
        if (str == null || str.isBlank()) {
            throw new IrisSendException("Resource type is required for subscription event!");
        }
        String fromAnnotationClass = ExchangeParser.getFromAnnotationClass(messageAnnotation);
        String format = String.format("%s.%s", fromAnnotationClass, RESOURCE);
        publish(new ResourceMessage(str, str2, obj), new RoutingDetails.Builder().eventName(fromAnnotationClass).exchange(Exchanges.SUBSCRIPTION.getValue()).exchangeType(ExchangeType.TOPIC).routingKey(format).scope(null).persistent(PersistentParser.getFromAnnotationClass(messageAnnotation)).cacheTtl((Integer) getCachedAnnotation(obj).map(CacheableTtlParser::getFromAnnotationClass).orElse(null)).build());
    }

    private void doSend(Object obj, String str) throws IrisSendException {
        org.iris_events.annotations.Message messageAnnotation = getMessageAnnotation(obj);
        Scope fromAnnotationClass = MessageScopeParser.getFromAnnotationClass(messageAnnotation);
        switch (AnonymousClass1.$SwitchMap$org$iris_events$annotations$Scope[fromAnnotationClass.ordinal()]) {
            case 1:
                publish(obj, getRoutingDetailsFromAnnotation(messageAnnotation, fromAnnotationClass, str));
                return;
            case 2:
            case 3:
            case 4:
                publish(obj, getRoutingDetailsForClientScope(messageAnnotation, fromAnnotationClass, str));
                return;
            default:
                throw new IrisSendException("Message scope " + fromAnnotationClass + " not supported!");
        }
    }

    private RoutingDetails getRoutingDetailsFromAnnotation(org.iris_events.annotations.Message message, Scope scope, String str) {
        ExchangeType fromAnnotationClass = ExchangeTypeParser.getFromAnnotationClass(message);
        String fromAnnotationClass2 = ExchangeParser.getFromAnnotationClass(message);
        String routingKey = getRoutingKey(message, fromAnnotationClass);
        return new RoutingDetails.Builder().eventName(fromAnnotationClass2).exchange(fromAnnotationClass2).exchangeType(fromAnnotationClass).routingKey(routingKey).scope(scope).userId(str).persistent(PersistentParser.getFromAnnotationClass(message)).build();
    }

    private RoutingDetails getRoutingDetailsForClientScope(org.iris_events.annotations.Message message, Scope scope, String str) {
        String str2 = (String) Optional.ofNullable(str).map(str3 -> {
            return Exchanges.USER.getValue();
        }).orElseGet(() -> {
            switch (AnonymousClass1.$SwitchMap$org$iris_events$annotations$Scope[scope.ordinal()]) {
                case 2:
                    return Exchanges.USER.getValue();
                case 3:
                    return Exchanges.SESSION.getValue();
                case 4:
                    return Exchanges.BROADCAST.getValue();
                default:
                    throw new IrisSendException("Message scope " + scope + " not supported!");
            }
        });
        String fromAnnotationClass = ExchangeParser.getFromAnnotationClass(message);
        return new RoutingDetails.Builder().eventName(fromAnnotationClass).exchange(str2).exchangeType(ExchangeType.TOPIC).routingKey(String.format("%s.%s", fromAnnotationClass, str2)).scope(scope).userId(str).persistent(PersistentParser.getFromAnnotationClass(message)).build();
    }

    private org.iris_events.annotations.Message getMessageAnnotation(Object obj) {
        if (obj == null) {
            throw new IrisSendException("Null message can not be published!");
        }
        return (org.iris_events.annotations.Message) Optional.ofNullable(obj.getClass().getAnnotation(org.iris_events.annotations.Message.class)).orElseThrow(() -> {
            return new IrisSendException("Message annotation is required.");
        });
    }

    private Optional<CachedMessage> getCachedAnnotation(Object obj) {
        return obj == null ? Optional.empty() : Optional.ofNullable(obj.getClass().getAnnotation(CachedMessage.class));
    }

    public void addReturnListener(@NotNull String str, @NotNull ReturnListener returnListener) throws IOException {
        Objects.requireNonNull(returnListener, "Return listener can not be null");
        Channel orCreateChannelById = this.channelService.getOrCreateChannelById(str);
        orCreateChannelById.clearReturnListeners();
        orCreateChannelById.addReturnListener(returnListener);
    }

    public void addReturnCallback(@NotNull String str, @NotNull ReturnCallback returnCallback) throws IOException {
        Objects.requireNonNull(returnCallback, "Return callback can not be null");
        Channel orCreateChannelById = this.channelService.getOrCreateChannelById(str);
        orCreateChannelById.clearReturnListeners();
        orCreateChannelById.addReturnListener(returnCallback);
    }

    public void addConfirmListener(@NotNull String str, @NotNull ConfirmListener confirmListener) throws IOException {
        Objects.requireNonNull(confirmListener, "Confirm listener can not be null");
        Channel orCreateChannelById = this.channelService.getOrCreateChannelById(str);
        orCreateChannelById.clearConfirmListeners();
        orCreateChannelById.addConfirmListener(confirmListener);
    }

    public void registerTransactionCallback(TransactionCallback transactionCallback) {
        this.transactionCallback = transactionCallback;
    }

    public void publish(@NotNull Object obj, RoutingDetails routingDetails) throws IrisSendException {
        SendMessageValidator.validate(routingDetails);
        Optional<Transaction> optionalTransaction = getOptionalTransaction();
        if (!optionalTransaction.isPresent()) {
            executePublish(obj, routingDetails);
            return;
        }
        Transaction transaction = optionalTransaction.get();
        enqueueDelayedMessage(obj, routingDetails, transaction);
        registerDefaultTransactionCallback(transaction);
    }

    private void enqueueDelayedMessage(Object obj, RoutingDetails routingDetails, Transaction transaction) {
        this.transactionDelayedMessages.computeIfAbsent(transaction, transaction2 -> {
            return new LinkedList();
        }).add(new Message(obj, routingDetails, this.basicPropertiesProvider.getOrCreateAmqpBasicProperties(routingDetails), this.eventContext.getEnvelope()));
    }

    private Optional<Transaction> getOptionalTransaction() throws IrisTransactionException {
        try {
            return Optional.ofNullable(this.transactionManager.getTransaction());
        } catch (SystemException e) {
            throw new IrisTransactionException("Exception retrieving transaction from transaction manager", e);
        }
    }

    private void registerDefaultTransactionCallback(Transaction transaction) throws IrisSendException {
        try {
            transaction.registerSynchronization(new ProducerSynchronization(transaction));
        } catch (RollbackException | SystemException e) {
            throw new IrisSendException("Exception registering transaction callback", e);
        }
    }

    private void executeTxPublish(Transaction transaction) throws IOException, IrisSendException {
        LinkedList linkedList = (LinkedList) this.transactionDelayedMessages.get(transaction);
        Message message = linkedList != null ? (Message) linkedList.poll() : null;
        while (true) {
            Message message2 = message;
            if (message2 == null) {
                return;
            }
            this.eventContext.setEnvelope(message2.envelope());
            this.eventContext.setBasicProperties(message2.properties());
            executePublish(message2.message(), message2.routingDetails());
            message = (Message) linkedList.poll();
        }
    }

    private void executePublish(Object obj, RoutingDetails routingDetails) throws IrisSendException {
        String exchange = routingDetails.getExchange();
        String routingKey = routingDetails.getRoutingKey();
        try {
            byte[] writeValueAsBytes = this.objectMapper.writeValueAsBytes(obj);
            synchronized (this.lock) {
                AMQP.BasicProperties orCreateAmqpBasicProperties = this.basicPropertiesProvider.getOrCreateAmqpBasicProperties(routingDetails);
                Channel orCreateChannelById = this.channelService.getOrCreateChannelById(ChannelKey.create(exchange, routingKey));
                log.info("publishing event to exchange: {}, routing key: {}, props: {}", new Object[]{exchange, routingKey, orCreateAmqpBasicProperties});
                orCreateChannelById.basicPublish(exchange, routingKey, true, orCreateAmqpBasicProperties, writeValueAsBytes);
                if (shouldWaitForConfirmations()) {
                    waitForConfirmations(orCreateChannelById);
                }
            }
        } catch (IOException e) {
            throw new IrisSendException("Exception executing publish.", e);
        }
    }

    private void waitForConfirmations(Channel channel) throws IrisSendException {
        try {
            try {
                channel.waitForConfirms(WAIT_TIMEOUT_MILLIS);
                this.count.set(0);
            } catch (InterruptedException | TimeoutException e) {
                throw new IrisSendException("Exception waiting for confirmations.", e);
            }
        } catch (Throwable th) {
            this.count.set(0);
            throw th;
        }
    }

    private String getRoutingKey(org.iris_events.annotations.Message message, ExchangeType exchangeType) {
        return exchangeType == ExchangeType.FANOUT ? "" : RoutingKeyParser.getFromAnnotationClass(message);
    }

    private boolean shouldWaitForConfirmations() {
        return this.config.getConfirmationBatchSize() > 0 && ((long) this.count.incrementAndGet()) == this.config.getConfirmationBatchSize();
    }
}
