package org.wso2.broker.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.broker.common.data.types.FieldTable;
import org.wso2.broker.core.Exchange;
import org.wso2.broker.core.configuration.BrokerConfiguration;
import org.wso2.broker.core.store.dao.DaoFactory;
import org.wso2.broker.core.store.dao.MessageDao;
import org.wso2.broker.core.store.dao.QueueDao;
import org.wso2.broker.core.task.TaskExecutorService;

/* loaded from: input_file:org/wso2/broker/core/MessagingEngine.class */
final class MessagingEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessagingEngine.class);
    private static final long IDLE_TASK_DELAY_MILLIS = 100;
    private static final int WORKER_COUNT = 5;
    private final TaskExecutorService<MessageDeliveryTask> deliveryTaskService;
    private final MessageDao messageDao;
    private final QueueDao queueDao;
    private final AtomicLong messageIdGenerator;
    private final Map<String, QueueHandler> queueRegistry = new ConcurrentHashMap();
    private final ExchangeRegistry exchangeRegistry = new ExchangeRegistry();

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagingEngine(BrokerConfiguration brokerConfiguration) {
        DaoFactory daoFactory = new DaoFactory(brokerConfiguration.getDatasource());
        this.messageDao = daoFactory.createMesageDao();
        this.queueDao = daoFactory.createQueueDao();
        this.deliveryTaskService = new TaskExecutorService<>(5, IDLE_TASK_DELAY_MILLIS, new ThreadFactoryBuilder().setNameFormat("MessageDeliveryTaskThreadPool-%d").build());
        this.messageIdGenerator = new AtomicLong(0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void bind(String str, String str2, String str3, FieldTable fieldTable) throws BrokerException {
        Exchange exchange = this.exchangeRegistry.getExchange(str2);
        QueueHandler queueHandler = this.queueRegistry.get(str);
        if (exchange == null) {
            throw new BrokerException("Unknown exchange name: " + str2);
        }
        if (queueHandler == null) {
            throw new BrokerException("Unknown queue name: " + str);
        }
        if (str3.isEmpty()) {
            return;
        }
        exchange.bind(queueHandler.getQueue(), str3, fieldTable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unbind(String str, String str2, String str3) throws BrokerException {
        Exchange exchange = this.exchangeRegistry.getExchange(str2);
        QueueHandler queueHandler = this.queueRegistry.get(str);
        if (exchange == null) {
            throw new BrokerException("Unknown exchange name: " + str2);
        }
        if (queueHandler == null) {
            throw new BrokerException("Unknown queue name: " + str);
        }
        exchange.unbind(queueHandler.getQueue(), str3);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createQueue(String str, boolean z, boolean z2, boolean z3) throws BrokerException {
        QueueHandler queueHandler = this.queueRegistry.get(str);
        if (z && queueHandler == null) {
            throw new BrokerException("QueueHandler [ " + str + " ] doesn't exists. Passive parameter is set, hence not creating the queue.");
        }
        if (queueHandler == null) {
            QueueHandler queueHandler2 = new QueueHandler(new Queue(str, z, z2, z3, 1000));
            this.queueRegistry.put(str, queueHandler2);
            ExchangeRegistry.DEFAULT_EXCHANGE.bind(queueHandler2.getQueue(), str, FieldTable.EMPTY_TABLE);
            this.deliveryTaskService.add(new MessageDeliveryTask(queueHandler2));
            return;
        }
        if (z) {
            return;
        }
        if (queueHandler.getQueue().isDurable() != z2 || queueHandler.getQueue().isAutoDelete() != z3) {
            throw new BrokerException("Existing QueueHandler [ " + str + " ] does not match given parameters.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(Message message) throws BrokerException {
        Metadata metadata = message.getMetadata();
        Exchange exchange = this.exchangeRegistry.getExchange(metadata.getExchangeName());
        if (exchange == null) {
            throw new BrokerException("Message publish failed. Unknown exchange: " + metadata.getExchangeName());
        }
        String routingKey = metadata.getRoutingKey();
        BindingSet bindingsForRoute = exchange.getBindingsForRoute(routingKey);
        if (bindingsForRoute.isEmpty()) {
            LOGGER.info("Dropping message since no queues found for routing key " + routingKey);
            message.release();
            return;
        }
        boolean z = false;
        Iterator<Binding> it = bindingsForRoute.getUnfilteredBindings().iterator();
        while (it.hasNext()) {
            z |= pushToInMemoryQueue(message, it.next());
        }
        for (Binding binding : bindingsForRoute.getFilteredBindings()) {
            if (binding.getFilterExpression().evaluate(metadata)) {
                z |= pushToInMemoryQueue(message, binding);
            }
        }
        if (!z) {
            LOGGER.info("Dropping message since message didn't have any routes for routing key " + metadata.getRoutingKey());
        }
        message.release();
    }

    private boolean pushToInMemoryQueue(Message message, Binding binding) {
        Metadata metadata = message.getMetadata();
        String name = binding.getQueue().getName();
        QueueHandler queueHandler = this.queueRegistry.get(name);
        metadata.addOwnedQueue(name);
        Message shallowCopy = message.shallowCopy();
        boolean enqueue = queueHandler.enqueue(shallowCopy);
        if (!enqueue) {
            shallowCopy.release();
        }
        return enqueue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledge(String str, long j, boolean z) {
        this.queueRegistry.get(str).acknowledge(j, z);
        this.messageDao.detachFromQueue(str, Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteQueue(String str, boolean z, boolean z2) throws BrokerException {
        QueueHandler queueHandler = this.queueRegistry.get(str);
        if (queueHandler == null) {
            return;
        }
        if (z && !queueHandler.isUnused()) {
            throw new BrokerException("Cannot delete queue. Queue [ " + str + " ] has active consumers and the ifUnused parameter is set.");
        }
        if (z2 && !queueHandler.isEmpty()) {
            throw new BrokerException("Cannot delete queue. Queue [ " + str + " ] is not empty and the ifEmpty parameter is set.");
        }
        this.deliveryTaskService.remove(str);
        this.queueRegistry.remove(str);
        queueHandler.closeAllConsumers();
        this.queueDao.delete(queueHandler.getQueue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consume(Consumer consumer) throws BrokerException {
        QueueHandler queueHandler = this.queueRegistry.get(consumer.getQueueName());
        if (queueHandler == null) {
            throw new BrokerException("Cannot add consumer. Queue [ " + consumer.getQueueName() + " ] not found. Create the queue before attempting to consume.");
        }
        queueHandler.addConsumer(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startMessageDelivery() {
        this.deliveryTaskService.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopMessageDelivery() {
        this.deliveryTaskService.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createExchange(String str, String str2, boolean z, boolean z2) throws BrokerException {
        this.exchangeRegistry.declareExchange(str, Exchange.Type.from(str2), z, z2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteExchange(String str, String str2, boolean z) throws BrokerException {
        this.exchangeRegistry.deleteExchange(str, Exchange.Type.from(str2), z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeConsumer(Consumer consumer) {
        QueueHandler queueHandler = this.queueRegistry.get(consumer.getQueueName());
        if (queueHandler != null) {
            queueHandler.removeConsumer(consumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getNextMessageId() {
        return this.messageIdGenerator.incrementAndGet();
    }
}
