package org.joyqueue.broker.mqtt.cluster;

import com.google.common.base.Strings;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.mqtt.connection.MqttConnection;
import org.joyqueue.broker.mqtt.publish.MessagePublisher;
import org.joyqueue.broker.mqtt.session.MqttSession;
import org.joyqueue.broker.mqtt.subscriptions.MqttSubscription;
import org.joyqueue.broker.mqtt.util.PollSelector;
import org.joyqueue.broker.mqtt.util.Selector;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.service.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/mqtt/cluster/MqttConsumerManager.class */
public class MqttConsumerManager extends Service {
    private static final Logger LOG = LoggerFactory.getLogger(MqttConsumerManager.class);
    private static int CONSUME_THREAD_TOTAL = 10;
    private static int ASYNC_PUB_THREAD_TOTAL = 100;
    private static int ASYNC_ACK_THREAD_TOTAL = 50;
    private ExecutorService executorService;
    private ExecutorService asyncPublishExecutorService;
    private ExecutorService asyncAcknowledgeExecutorService;
    private Consume consume;
    private MqttConnectionManager connectionManager;
    private MqttSessionManager sessionManager;
    private MessagePublisher messagePublisher;
    private Selector selector = new PollSelector();
    private ConcurrentMap<Integer, Runnable> consumeThreadMap = new ConcurrentHashMap();
    private ConcurrentMap<String, Runnable> clientConsumeThreadMap = new ConcurrentHashMap();
    private ConcurrentMap<String, Consumer> consumers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/joyqueue/broker/mqtt/cluster/MqttConsumerManager$ConsumeTask.class */
    public class ConsumeTask implements Runnable {
        private String name = "consume-";
        private boolean isRunning = true;
        private ConcurrentMap<String, MqttSession> clientConsumeMap = new ConcurrentHashMap();

        ConsumeTask(String str) {
            this.name += str;
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public void setRunning(boolean z) {
            this.isRunning = z;
        }

        public void addClientConsume(String str, MqttSession mqttSession) {
            this.clientConsumeMap.put(str, mqttSession);
        }

        public void removeClientConsume(String str) {
            this.clientConsumeMap.remove(str);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.isRunning) {
                try {
                } catch (Exception e) {
                    MqttConsumerManager.LOG.error("Thread: <{}>, mqtt consume client message consume error, topic: <{}>, cause: <{}>", this.name, e.getMessage());
                }
                if (this.clientConsumeMap.size() > 0) {
                    this.clientConsumeMap.forEach((str, mqttSession) -> {
                        Set<MqttSubscription> listSubsciptions = mqttSession.listSubsciptions();
                        if (listSubsciptions == null || listSubsciptions.size() <= 0) {
                            return;
                        }
                        listSubsciptions.forEach(mqttSubscription -> {
                            String topicFilter = mqttSubscription.getTopicFilter().toString();
                            int value = mqttSubscription.getRequestedQos().value();
                            Consumer consumer = MqttConsumerManager.this.getConsumer(str, topicFilter);
                            if (consumer != null) {
                                try {
                                    MqttConsumerManager.this.asyncPublishExecutorService.execute(() -> {
                                        try {
                                            MqttConsumerManager.this.messagePublisher.publish2Subscriber(this.name, str, mqttSession, consumer, value);
                                        } catch (Exception e2) {
                                            MqttConsumerManager.LOG.error(e2.getMessage(), e2);
                                        }
                                    });
                                } catch (Exception e2) {
                                    MqttConsumerManager.LOG.error(e2.getMessage(), e2);
                                }
                            }
                        });
                    });
                    try {
                        Thread.sleep(800L);
                    } catch (InterruptedException e2) {
                        MqttConsumerManager.LOG.warn("mqtt consumer manager thread: <{}> interrupted, exception: {}", this.name, e2.getMessage());
                    }
                } else {
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e3) {
                        MqttConsumerManager.LOG.warn("mqtt consumer manager thread: <{}> interrupted, exception: {}", this.name, e3.getMessage());
                    }
                }
                MqttConsumerManager.LOG.error("Thread: <{}>, mqtt consume client message consume error, topic: <{}>, cause: <{}>", this.name, e.getMessage());
            }
            MqttConsumerManager.LOG.info("mqtt consumer manager thread: <{}> stop.", this.name);
        }
    }

    public MqttConsumerManager(BrokerContext brokerContext, MqttConnectionManager mqttConnectionManager, MqttSessionManager mqttSessionManager, MessagePublisher messagePublisher) {
        this.consume = brokerContext.getConsume();
        this.connectionManager = mqttConnectionManager;
        this.sessionManager = mqttSessionManager;
        this.messagePublisher = messagePublisher;
    }

    protected void validate() throws Exception {
        super.validate();
        this.executorService = Executors.newFixedThreadPool(CONSUME_THREAD_TOTAL, new NamedThreadFactory("mqtt-consume"));
        this.asyncPublishExecutorService = Executors.newFixedThreadPool(ASYNC_PUB_THREAD_TOTAL, new NamedThreadFactory("mqtt-async-publish"));
        this.asyncAcknowledgeExecutorService = Executors.newFixedThreadPool(ASYNC_ACK_THREAD_TOTAL, new NamedThreadFactory("mqtt-async-acknowledge"));
    }

    public void start() throws Exception {
        super.start();
        for (int i = 0; i < CONSUME_THREAD_TOTAL; i++) {
            ConsumeTask consumeTask = new ConsumeTask(Integer.toString(i));
            this.consumeThreadMap.put(Integer.valueOf(i), consumeTask);
            this.executorService.execute(consumeTask);
        }
    }

    public void stop() {
        super.stop();
        if (this.executorService.isShutdown()) {
            return;
        }
        this.clientConsumeThreadMap.forEach((str, runnable) -> {
            if (runnable != null) {
                ((ConsumeTask) runnable).setRunning(false);
            }
        });
        this.executorService.shutdownNow();
    }

    public void fireConsume(String str) {
        this.clientConsumeThreadMap.put(str, selectThreadConsume(str, this.sessionManager.getSession(str)));
    }

    public void stopConsume(String str) {
        ConsumeTask consumeTask = (ConsumeTask) this.clientConsumeThreadMap.get(str);
        if (consumeTask != null) {
            consumeTask.removeClientConsume(str);
            this.clientConsumeThreadMap.remove(str);
        }
        removeConsumer(str);
    }

    public void acknowledge(String str, int i) {
        BrokerMessage acquireAcknowledgedMessage;
        MqttSession session = this.sessionManager.getSession(str);
        if (session == null || (acquireAcknowledgedMessage = session.getMessageAcknowledgedZone().acquireAcknowledgedMessage(Integer.valueOf(i))) == null) {
            return;
        }
        short partition = acquireAcknowledgedMessage.getPartition();
        long msgIndexNo = acquireAcknowledgedMessage.getMsgIndexNo();
        Consumer consumer = getConsumer(str, acquireAcknowledgedMessage.getTopic());
        if (consumer != null) {
            this.asyncAcknowledgeExecutorService.submit(() -> {
                commitAcknowledge(consumer, partition, msgIndexNo);
            });
        }
    }

    private ConsumeTask selectThreadConsume(String str, MqttSession mqttSession) {
        ConsumeTask consumeTask = (ConsumeTask) this.consumeThreadMap.get(Integer.valueOf(this.selector.select(str, CONSUME_THREAD_TOTAL)));
        consumeTask.addClientConsume(str, mqttSession);
        return consumeTask;
    }

    /* JADX WARN: Code restructure failed: missing block: B:6:0x004f, code lost:
    
        if (r0 == null) goto L8;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.joyqueue.network.session.Consumer getConsumer(java.lang.String r8, java.lang.String r9) {
        /*
            r7 = this;
            r0 = 0
            r10 = r0
            r0 = r7
            org.joyqueue.broker.mqtt.cluster.MqttConnectionManager r0 = r0.connectionManager
            r1 = r8
            boolean r0 = r0.isConnected(r1)
            if (r0 == 0) goto Lab
            r0 = r7
            org.joyqueue.broker.mqtt.cluster.MqttConnectionManager r0 = r0.connectionManager
            r1 = r8
            org.joyqueue.broker.mqtt.connection.MqttConnection r0 = r0.getConnection(r1)
            r11 = r0
            r0 = r11
            java.lang.String r0 = r0.getApplication()
            r12 = r0
            r0 = r11
            java.lang.String r0 = r0.getClientGroupName()
            r13 = r0
            r0 = r11
            r1 = r9
            r2 = r12
            java.lang.String r0 = r0.getConsumer(r1, r2)
            r14 = r0
            r0 = r14
            boolean r0 = com.google.common.base.Strings.isNullOrEmpty(r0)
            if (r0 != 0) goto L52
            r0 = r7
            java.util.concurrent.ConcurrentMap<java.lang.String, org.joyqueue.network.session.Consumer> r0 = r0.consumers
            r1 = r7
            r2 = r8
            r3 = r9
            r4 = r12
            r5 = r13
            java.lang.String r1 = r1.generateConsumerId(r2, r3, r4, r5)
            java.lang.Object r0 = r0.get(r1)
            org.joyqueue.network.session.Consumer r0 = (org.joyqueue.network.session.Consumer) r0
            r1 = r0
            r10 = r1
            if (r0 != 0) goto Lab
        L52:
            r0 = r7
            r1 = r8
            r2 = r9
            r3 = r12
            r4 = r13
            java.lang.String r0 = r0.generateConsumerId(r1, r2, r3, r4)
            r14 = r0
            org.joyqueue.network.session.Consumer r0 = new org.joyqueue.network.session.Consumer
            r1 = r0
            r1.<init>()
            r10 = r0
            r0 = r10
            r1 = r14
            r0.setId(r1)
            r0 = r10
            r1 = r11
            java.lang.String r1 = r1.getId()
            r0.setConnectionId(r1)
            r0 = r10
            r1 = r12
            r0.setApp(r1)
            r0 = r10
            r1 = r9
            r0.setTopic(r1)
            r0 = r10
            org.joyqueue.network.session.Consumer$ConsumeType r1 = org.joyqueue.network.session.Consumer.ConsumeType.MQTT
            r0.setType(r1)
            r0 = r7
            java.util.concurrent.ConcurrentMap<java.lang.String, org.joyqueue.network.session.Consumer> r0 = r0.consumers
            r1 = r14
            r2 = r10
            java.lang.Object r0 = r0.putIfAbsent(r1, r2)
            org.joyqueue.network.session.Consumer r0 = (org.joyqueue.network.session.Consumer) r0
            r15 = r0
            r0 = r11
            r1 = r9
            r2 = r12
            r3 = r14
            boolean r0 = r0.addConsumer(r1, r2, r3)
            r0 = r15
            if (r0 == 0) goto Lab
            r0 = r15
            r10 = r0
        Lab:
            r0 = r10
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.joyqueue.broker.mqtt.cluster.MqttConsumerManager.getConsumer(java.lang.String, java.lang.String):org.joyqueue.network.session.Consumer");
    }

    public void removeConsumer(String str) {
        if (this.connectionManager.isConnected(str)) {
            MqttConnection connection = this.connectionManager.getConnection(str);
            Iterator<String> it = connection.getConsumers().keySet().iterator();
            while (it.hasNext()) {
                ConcurrentMap<String, String> concurrentMap = connection.getConsumers().get(it.next());
                for (String str2 : concurrentMap.keySet()) {
                    String str3 = concurrentMap.get(str2);
                    if (!Strings.isNullOrEmpty(str3)) {
                        this.consumers.remove(str3);
                        concurrentMap.remove(str2, str3);
                    }
                }
            }
        }
    }

    private String generateConsumerId(String str, String str2, String str3, String str4) {
        return String.format("%s_consumer_%s_%s_%s", str, str3, str2, str4);
    }

    private void commitAcknowledge(Consumer consumer, short s, long j) {
        try {
            this.consume.setAckIndex(consumer, s, j);
        } catch (JoyQueueException e) {
            LOG.error(e.getMessage(), e);
        }
    }
}
