package org.joyqueue.broker.mqtt.network;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import java.util.List;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.mqtt.handler.MqttHandlerDispatcher;
import org.joyqueue.broker.mqtt.transport.MqttCommandInvocation;
import org.joyqueue.broker.network.protocol.support.DefaultProtocolHandlerPipeline;
import org.joyqueue.network.handler.ConnectionHandler;
import org.joyqueue.network.protocol.Protocol;

/* loaded from: input_file:org/joyqueue/broker/mqtt/network/MqttOverWebsocketProtocolHandlerPipeline.class */
public class MqttOverWebsocketProtocolHandlerPipeline extends AbstractMqttProtocolPipeline {
    private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
    private Protocol protocol;
    private BrokerContext brokerContext;

    /* loaded from: input_file:org/joyqueue/broker/mqtt/network/MqttOverWebsocketProtocolHandlerPipeline$ByteBufToWebSocketFrameEncoder.class */
    static class ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder<ByteBuf> {
        ByteBufToWebSocketFrameEncoder() {
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame();
            binaryWebSocketFrame.content().writeBytes(byteBuf);
            list.add(binaryWebSocketFrame);
        }

        protected /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            encode(channelHandlerContext, (ByteBuf) obj, (List<Object>) list);
        }
    }

    /* loaded from: input_file:org/joyqueue/broker/mqtt/network/MqttOverWebsocketProtocolHandlerPipeline$WebSocketFrameToByteBufDecoder.class */
    static class WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {
        WebSocketFrameToByteBufDecoder() {
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame, List<Object> list) throws Exception {
            ByteBuf content = binaryWebSocketFrame.content();
            content.retain();
            list.add(content);
        }

        protected /* bridge */ /* synthetic */ void decode(ChannelHandlerContext channelHandlerContext, Object obj, List list) throws Exception {
            decode(channelHandlerContext, (BinaryWebSocketFrame) obj, (List<Object>) list);
        }
    }

    public MqttOverWebsocketProtocolHandlerPipeline(Protocol protocol, ChannelHandler channelHandler, BrokerContext brokerContext) {
        super(brokerContext);
        this.protocol = protocol;
        this.brokerContext = brokerContext;
        if (channelHandler instanceof DefaultProtocolHandlerPipeline) {
        }
    }

    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(65536)}).addLast(new ChannelHandler[]{new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST)}).addLast(new ChannelHandler[]{new WebSocketFrameToByteBufDecoder()}).addLast(new ChannelHandler[]{new ByteBufToWebSocketFrameEncoder()}).addLast(new ChannelHandler[]{new MqttDecoder(this.mqttContext.getMqttConfig().getMaxPayloadSize())}).addLast(new ChannelHandler[]{MqttEncoder.INSTANCE}).addLast(new ChannelHandler[]{new ConnectionHandler()}).addLast(new ChannelHandler[]{newMqttCommandInvocation()});
    }

    @Override // org.joyqueue.broker.mqtt.network.AbstractMqttProtocolPipeline
    protected MqttCommandInvocation newMqttCommandInvocation() {
        return new MqttCommandInvocation(newMqttHandlerDispatcher());
    }

    @Override // org.joyqueue.broker.mqtt.network.AbstractMqttProtocolPipeline
    protected MqttHandlerDispatcher newMqttHandlerDispatcher() {
        MqttHandlerDispatcher mqttHandlerDispatcher = new MqttHandlerDispatcher(this.protocol.createCommandHandlerFactory(), this.brokerContext, this.mqttContext);
        try {
            mqttHandlerDispatcher.start();
        } catch (Exception e) {
        }
        return mqttHandlerDispatcher;
    }
}
