package edu.kit.datamanager.service.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import edu.kit.datamanager.configuration.RabbitMQConfiguration;
import edu.kit.datamanager.entities.messaging.IAMQPSubmittable;
import edu.kit.datamanager.repo.dao.DummyAMQPMessageDao;
import edu.kit.datamanager.repo.dao.IAMQPMessageDao;
import edu.kit.datamanager.repo.domain.AMQPMessage;
import edu.kit.datamanager.service.IMessagingService;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:edu/kit/datamanager/service/impl/RabbitMQMessagingService.class */
public class RabbitMQMessagingService implements IMessagingService {

    @Autowired
    private Optional<RabbitMQConfiguration> configuration;

    @Autowired
    private Optional<IAMQPMessageDao> messageDao;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQMessagingService.class);

    @Override // edu.kit.datamanager.service.IMessagingService
    public void send(IAMQPSubmittable iAMQPSubmittable) {
        logger.trace("Processing new AMQPSubmittable via RabbitMQMessagingService.");
        if (!this.configuration.isPresent() || !this.configuration.get().isMessagingEnabled()) {
            logger.trace("Messaging is disabled. All messages are discarded.");
            return;
        }
        logger.trace("Messaging enabled, serializing and submitting message.");
        boolean z = true;
        String str = null;
        String str2 = null;
        String str3 = null;
        try {
            try {
                try {
                    try {
                        str = iAMQPSubmittable.toJson();
                        str2 = iAMQPSubmittable.getRoutingKey();
                        str3 = this.configuration.get().rabbitMQExchange().getName();
                        logger.trace("Sending message {} via exchange {} and route {}.", new Object[]{str, str3, str2});
                        this.configuration.get().rabbitMQTemplate().convertAndSend(this.configuration.get().rabbitMQExchange().getName(), str2, str);
                        logger.trace("Message sent.");
                        z = false;
                        checkAndSendPreservedMessages();
                        if (0 != 0) {
                            AMQPMessage aMQPMessage = new AMQPMessage(str3, str2, str);
                            logger.trace("Persisting unsent AMQP message to database.");
                            logger.trace("AMQP message successfully persisted with id {} for later submission.", ((AMQPMessage) this.messageDao.orElse(new DummyAMQPMessageDao()).save(aMQPMessage)).getId());
                        }
                    } catch (AmqpException e) {
                        logger.error("Failed to send message. Unexpected exception occured.", e);
                        if (z) {
                            AMQPMessage aMQPMessage2 = new AMQPMessage(str3, str2, str);
                            logger.trace("Persisting unsent AMQP message to database.");
                            logger.trace("AMQP message successfully persisted with id {} for later submission.", ((AMQPMessage) this.messageDao.orElse(new DummyAMQPMessageDao()).save(aMQPMessage2)).getId());
                        }
                    }
                } catch (JsonProcessingException e2) {
                    logger.error("Failed to send message " + iAMQPSubmittable + ". Unable to serialize message to JSON.", e2);
                    z = false;
                    if (0 != 0) {
                        AMQPMessage aMQPMessage3 = new AMQPMessage(str3, str2, str);
                        logger.trace("Persisting unsent AMQP message to database.");
                        logger.trace("AMQP message successfully persisted with id {} for later submission.", ((AMQPMessage) this.messageDao.orElse(new DummyAMQPMessageDao()).save(aMQPMessage3)).getId());
                    }
                }
            } catch (AmqpConnectException e3) {
                logger.error("Failed to send message. Connection to message queue failed.", e3);
                if (z) {
                    AMQPMessage aMQPMessage4 = new AMQPMessage(str3, str2, str);
                    logger.trace("Persisting unsent AMQP message to database.");
                    logger.trace("AMQP message successfully persisted with id {} for later submission.", ((AMQPMessage) this.messageDao.orElse(new DummyAMQPMessageDao()).save(aMQPMessage4)).getId());
                }
            }
        } catch (Throwable th) {
            if (z) {
                AMQPMessage aMQPMessage5 = new AMQPMessage(str3, str2, str);
                logger.trace("Persisting unsent AMQP message to database.");
                logger.trace("AMQP message successfully persisted with id {} for later submission.", ((AMQPMessage) this.messageDao.orElse(new DummyAMQPMessageDao()).save(aMQPMessage5)).getId());
            }
            throw th;
        }
    }

    private void checkAndSendPreservedMessages() {
        logger.trace("Checking for unsubmitted messages.");
        Page findAll = this.messageDao.orElse(new DummyAMQPMessageDao()).findAll(PageRequest.of(0, 100));
        int size = findAll.getContent().size();
        logger.trace("Found {} unsubmitted messages in database.", Integer.valueOf(size));
        findAll.getContent().stream().map(aMQPMessage -> {
            logger.trace("Removing message #{} from database.", aMQPMessage.getId());
            return aMQPMessage;
        }).map(aMQPMessage2 -> {
            this.messageDao.orElse(new DummyAMQPMessageDao()).delete(aMQPMessage2);
            return aMQPMessage2;
        }).map(aMQPMessage3 -> {
            logger.trace("Resending message.");
            return aMQPMessage3;
        }).forEachOrdered(aMQPMessage4 -> {
            send(aMQPMessage4);
        });
        logger.trace("{} unsubmitted messages remaining.", Long.valueOf(findAll.getTotalElements() - size));
    }

    public Health health() {
        logger.trace("Obtaining health information.");
        return (this.configuration.isPresent() && this.configuration.get().isMessagingEnabled()) ? Health.up().withDetail("RabbitMQMessaging", this.configuration.get().rabbitMQExchange()).build() : Health.unknown().build();
    }
}
