package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.lifecycle.LifeSupportService;
import com.mware.core.model.workQueue.WebQueueRepository;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import org.json.JSONObject;

/* loaded from: input_file:com/mware/core/model/workQueue/RabbitMQWebQueueRepository.class */
public class RabbitMQWebQueueRepository extends WebQueueRepository {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWebQueueRepository.class);
    public static final String CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT = "exBroadcast";
    private final Connection connection;
    private final Channel channel;
    private Thread thread;

    @Inject
    public RabbitMQWebQueueRepository(Configuration configuration, LifeSupportService lifeSupportService) throws IOException {
        this.connection = RabbitMQUtils.openConnection(configuration);
        this.channel = RabbitMQUtils.openChannel(this.connection);
        this.channel.exchangeDeclare(CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "fanout");
        lifeSupportService.add(this);
    }

    @Override // com.mware.core.model.workQueue.WebQueueRepository
    public void broadcastJson(JSONObject jSONObject) {
        try {
            LOGGER.debug("publishing message to broadcast exchange [%s]: %s", CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, jSONObject.toString());
            this.channel.basicPublish(CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "", (AMQP.BasicProperties) null, jSONObject.toString().getBytes());
        } catch (IOException e) {
            throw new BcException("Could not broadcast json", e);
        }
    }

    @Override // com.mware.core.model.workQueue.WebQueueRepository
    public void subscribeToBroadcastMessages(WebQueueRepository.BroadcastConsumer broadcastConsumer) {
        try {
            String queue = this.channel.queueDeclare().getQueue();
            this.channel.queueBind(queue, CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, "");
            QueueingConsumer queueingConsumer = new QueueingConsumer(this.channel);
            this.channel.basicConsume(queue, true, queueingConsumer);
            this.thread = new Thread(() -> {
                while (true) {
                    try {
                        try {
                            JSONObject jSONObject = new JSONObject(new String(queueingConsumer.nextDelivery().getBody()));
                            LOGGER.debug("received message from broadcast exchange [%s]: %s", CONFIG_BROADCAST_EXCHANGE_NAME_DEFAULT, jSONObject.toString());
                            broadcastConsumer.broadcastReceived(jSONObject);
                        } catch (Throwable th) {
                            LOGGER.error("problem in broadcast thread", th);
                        }
                    } catch (InterruptedException e) {
                        throw new BcException("broadcast listener has died", e);
                    }
                }
            });
            this.thread.setName("rabbitmq-subscribe-" + broadcastConsumer.getClass().getName());
            this.thread.setDaemon(true);
            this.thread.start();
        } catch (IOException e) {
            throw new BcException("Could not subscribe to broadcasts", e);
        }
    }

    @Override // com.mware.core.model.workQueue.WebQueueRepository
    public void unsubscribeFromBroadcastMessages(WebQueueRepository.BroadcastConsumer broadcastConsumer) {
        try {
            this.channel.close();
        } catch (Exception e) {
        }
    }

    @Override // com.mware.core.lifecycle.LifecycleAdapter, com.mware.core.lifecycle.Lifecycle
    public void shutdown() {
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
        } catch (Throwable th) {
            LOGGER.error("Could not close RabbitMQ channel", th);
        }
        try {
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        } catch (Throwable th2) {
            LOGGER.error("Could not close RabbitMQ connection", th2);
        }
        try {
            this.thread.interrupt();
        } catch (Exception e) {
        }
    }
}
