package com.mware.core.model.workQueue;

import com.google.inject.Inject;
import com.mware.core.bootstrap.InjectHelper;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.lifecycle.LifeSupportService;
import com.mware.core.status.model.QueueStatus;
import com.mware.core.status.model.Status;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import com.mware.ge.Graph;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.json.JSONArray;
import org.json.JSONObject;

/* loaded from: input_file:com/mware/core/model/workQueue/RabbitMQWorkQueueRepository.class */
public class RabbitMQWorkQueueRepository extends WorkQueueRepository {
    private static final BcLogger LOGGER = BcLoggerFactory.getLogger(RabbitMQWorkQueueRepository.class);
    private Connection connection;
    private Channel channel;
    private Integer deliveryMode;
    private Address[] rabbitMqAddresses;
    private Set<String> declaredQueues;

    @Inject
    public RabbitMQWorkQueueRepository(Graph graph, Configuration configuration, LifeSupportService lifeSupportService) {
        super(graph, configuration);
        this.declaredQueues = new HashSet();
        lifeSupportService.add(this);
    }

    @Override // com.mware.core.lifecycle.LifecycleAdapter, com.mware.core.lifecycle.Lifecycle
    public void start() throws Throwable {
        this.connection = RabbitMQUtils.openConnection(getConfiguration());
        this.channel = RabbitMQUtils.openChannel(this.connection);
        this.deliveryMode = getConfiguration().getInt(RabbitMQUtils.RABBITMQ_DELIVERY_MODE, MessageProperties.PERSISTENT_BASIC.getDeliveryMode());
        this.rabbitMqAddresses = RabbitMQUtils.getAddresses(getConfiguration());
    }

    @Override // com.mware.core.model.workQueue.WorkQueueRepository
    public void pushOnQueue(String str, byte[] bArr, Priority priority) {
        try {
            ensureQueue(str);
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            if (this.deliveryMode != null) {
                builder.deliveryMode(this.deliveryMode);
            }
            LOGGER.debug("enqueuing message to queue [%s]: %s", str, new String(bArr));
            builder.priority(toRabbitMQPriority(priority));
            this.channel.basicPublish("", str, builder.build(), bArr);
        } catch (Exception e) {
            throw new BcException("Could not push on queue", e);
        }
    }

    private Integer toRabbitMQPriority(Priority priority) {
        switch (priority) {
            case HIGH:
                return 2;
            case NORMAL:
                return 1;
            case LOW:
                return 0;
            default:
                return 0;
        }
    }

    public void ensureQueue(String str) throws IOException {
        if (this.declaredQueues.contains(str)) {
            return;
        }
        createQueue(this.channel, str);
        this.declaredQueues.add(str);
    }

    public static void createQueue(Channel channel, String str) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("x-max-priority", 3);
        channel.queueDeclare(str, true, false, false, hashMap);
    }

    @Override // com.mware.core.model.workQueue.WorkQueueRepository
    public void flush() {
    }

    @Override // com.mware.core.lifecycle.LifecycleAdapter, com.mware.core.lifecycle.Lifecycle
    public void shutdown() {
        try {
            LOGGER.debug("Closing RabbitMQ channel", new Object[0]);
            this.channel.close();
        } catch (Throwable th) {
            LOGGER.error("Could not close RabbitMQ channel", th);
        }
        try {
            LOGGER.debug("Closing RabbitMQ connection", new Object[0]);
            this.connection.close();
        } catch (Throwable th2) {
            LOGGER.error("Could not close RabbitMQ connection", th2);
        }
    }

    @Override // com.mware.core.model.workQueue.WorkQueueRepository
    public Map<String, Status> getQueuesStatus() {
        try {
            HashMap hashMap = new HashMap();
            URLConnection openConnection = new URL(String.format("http://%s:15672/api/queues", this.rabbitMqAddresses[0].getHost())).openConnection();
            openConnection.setRequestProperty("Authorization", "Basic " + Base64.encodeBase64String("guest:guest".getBytes()));
            InputStream inputStream = openConnection.getInputStream();
            Throwable th = null;
            try {
                try {
                    JSONArray jSONArray = new JSONArray(IOUtils.toString(inputStream));
                    for (int i = 0; i < jSONArray.length(); i++) {
                        JSONObject jSONObject = jSONArray.getJSONObject(i);
                        hashMap.put(jSONObject.getString("name"), new QueueStatus(jSONObject.getInt("messages")));
                    }
                    if (inputStream != null) {
                        if (0 != 0) {
                            try {
                                inputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            inputStream.close();
                        }
                    }
                    return hashMap;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new BcException("Could not connect to RabbitMQ", e);
        }
    }

    @Override // com.mware.core.model.workQueue.WorkQueueRepository
    protected void deleteQueue(String str) {
        try {
            this.channel.queueDelete(str);
        } catch (IOException e) {
            throw new BcException("Could not delete queue: " + str, e);
        }
    }

    @Override // com.mware.core.model.workQueue.WorkQueueRepository
    public WorkerSpout createWorkerSpout(String str) {
        return (WorkerSpout) InjectHelper.inject(new RabbitMQWorkQueueSpout(str));
    }
}
