package org.joyqueue.broker.mqtt.publish;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.broker.consumer.Consume;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.mqtt.cluster.MqttConnectionManager;
import org.joyqueue.broker.mqtt.session.MqttSession;
import org.joyqueue.broker.mqtt.util.MqttMessageSerializer;
import org.joyqueue.broker.producer.Produce;
import org.joyqueue.domain.QosLevel;
import org.joyqueue.exception.JoyQueueException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.network.session.Consumer;
import org.joyqueue.network.session.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/mqtt/publish/MessagePublisher.class */
public class MessagePublisher {
    private static final Logger LOG = LoggerFactory.getLogger(MessagePublisher.class);
    private Produce produce;
    private Consume consume;
    private MqttConnectionManager connectionManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.joyqueue.broker.mqtt.publish.MessagePublisher$1, reason: invalid class name */
    /* loaded from: input_file:org/joyqueue/broker/mqtt/publish/MessagePublisher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.FAILURE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public MessagePublisher(BrokerContext brokerContext, MqttConnectionManager mqttConnectionManager) {
        this.produce = brokerContext.getProduce();
        this.consume = brokerContext.getConsume();
        this.connectionManager = mqttConnectionManager;
    }

    public void publishMessage(Producer producer, Channel channel, MqttPublishMessage mqttPublishMessage) throws JoyQueueException {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        int packetId = mqttPublishMessage.variableHeader().packetId();
        this.produce.putMessageAsync(producer, Collections.singletonList(MqttMessageSerializer.convertToBrokerMsg(channel, mqttPublishMessage)), QosLevel.RECEIVE, writeResult -> {
            processPublishResult(channel, qosLevel, packetId);
        });
    }

    private void processPublishResult(Channel channel, MqttQoS mqttQoS, int i) {
        switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[mqttQoS.ordinal()]) {
            case 1:
            case 3:
            default:
                return;
            case 2:
                sendPubAck(channel, Integer.valueOf(i));
                return;
            case 4:
                LOG.info("Received failure qos message, ignore...");
                return;
        }
    }

    private void sendPubAck(Channel channel, Integer num) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("发送PubAck消息给客户端");
        }
        try {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(num.intValue()), (Object) null));
        } catch (Throwable th) {
            LOG.error("Send pubAck error!", th);
            channel.close().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }

    public void publish2Subscriber(String str, String str2, MqttSession mqttSession, Consumer consumer, int i) throws Exception {
        PullResult message = this.consume.getMessage(consumer, 1, 120000);
        String topic = message.getTopic();
        List buffers = message.getBuffers();
        if (buffers == null || buffers.size() <= 0) {
            return;
        }
        BrokerMessage readBrokerMessage = Serializer.readBrokerMessage((ByteBuffer) buffers.get(0));
        MqttPublishMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(MqttMessageSerializer.getLowerQos(MqttMessageSerializer.readExtension(readBrokerMessage), i)), false, 0), new MqttPublishVariableHeader(topic, mqttSession.getMessageAcknowledgedZone().acquireAcknowledgedPosition(readBrokerMessage)), Unpooled.wrappedBuffer(readBrokerMessage.getByteBody()));
        if (this.connectionManager.isConnected(str2)) {
            Channel channel = this.connectionManager.getConnection(str2).getChannel();
            if (channel.isActive() && channel.isOpen()) {
                channel.writeAndFlush(newMessage).addListener(channelFuture -> {
                    if (!channelFuture.isSuccess()) {
                        LOG.error("publish message error, thread: <{}>, clientID: <{}>, message: <{}>, cause: <{}>", new Object[]{str, str2, readBrokerMessage, channelFuture.cause()});
                        throw new Exception(channelFuture.cause());
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("推送消息成功: {}", newMessage);
                    }
                });
            }
        }
    }
}
