/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolHandler;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTRoutingHandler;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;

public class MQTTProtocolManager
extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRoutingHandler>
implements NotificationListener {
    private static final Logger logger = Logger.getLogger(MQTTProtocolManager.class);
    private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
    private ActiveMQServer server;
    private MQTTLogger log = MQTTLogger.LOGGER;
    private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<MQTTInterceptor>();
    private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<MQTTInterceptor>();
    private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<String, MQTTConnection>();
    private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<String, MQTTSessionState>();
    private int defaultMqttSessionExpiryInterval = -1;
    private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX;
    private int receiveMaximum = MQTTUtil.DEFAULT_RECEIVE_MAXIMUM;
    private int serverKeepAlive = 60;
    private int maximumPacketSize = 0xFFFFFFF;
    private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
    private final MQTTRoutingHandler routingHandler;

    MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
        this.server = server;
        this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
        server.getManagementService().addNotificationListener((NotificationListener)this);
        this.routingHandler = new MQTTRoutingHandler(server);
    }

    public int getDefaultMqttSessionExpiryInterval() {
        return this.defaultMqttSessionExpiryInterval;
    }

    public MQTTProtocolManager setDefaultMqttSessionExpiryInterval(int sessionExpiryInterval) {
        this.defaultMqttSessionExpiryInterval = sessionExpiryInterval;
        return this;
    }

    public int getTopicAliasMaximum() {
        return this.topicAliasMaximum;
    }

    public MQTTProtocolManager setTopicAliasMaximum(int topicAliasMaximum) {
        this.topicAliasMaximum = topicAliasMaximum;
        return this;
    }

    public int getReceiveMaximum() {
        return this.receiveMaximum;
    }

    public MQTTProtocolManager setReceiveMaximum(int receiveMaximum) {
        this.receiveMaximum = receiveMaximum;
        return this;
    }

    public int getMaximumPacketSize() {
        return this.maximumPacketSize;
    }

    public MQTTProtocolManager setMaximumPacketSize(int maximumPacketSize) {
        this.maximumPacketSize = maximumPacketSize;
        return this;
    }

    public int getServerKeepAlive() {
        return this.serverKeepAlive;
    }

    public MQTTProtocolManager setServerKeepAlive(int serverKeepAlive) {
        this.serverKeepAlive = serverKeepAlive;
        return this;
    }

    public boolean isCloseMqttConnectionOnPublishAuthorizationFailure() {
        return this.closeMqttConnectionOnPublishAuthorizationFailure;
    }

    public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqttConnectionOnPublishAuthorizationFailure) {
        this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
    }

    public void onNotification(Notification notification) {
        String clientId;
        MQTTConnection mqttConnection;
        if (!(notification.getType() instanceof CoreNotificationType)) {
            return;
        }
        CoreNotificationType type = (CoreNotificationType)notification.getType();
        if (type != CoreNotificationType.SESSION_CREATED) {
            return;
        }
        TypedProperties props = notification.getProperties();
        SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
        if (protocolName == null || !protocolName.toString().startsWith("MQTT")) {
            return;
        }
        int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
        if (distance > 0 && (mqttConnection = this.connectedClients.get(clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString())) != null) {
            mqttConnection.destroy();
        }
    }

    public ProtocolManagerFactory getFactory() {
        return new MQTTProtocolManagerFactory();
    }

    public void updateInterceptors(List incoming, List outgoing) {
        this.incomingInterceptors.clear();
        this.incomingInterceptors.addAll(this.getFactory().filterInterceptors(incoming));
        this.outgoingInterceptors.clear();
        this.outgoingInterceptors.addAll(this.getFactory().filterInterceptors(outgoing));
    }

    public void scanSessions() {
        MQTTSessionState state;
        ArrayList<String> toRemove = new ArrayList<String>();
        for (Map.Entry<String, MQTTSessionState> entry : this.sessionStates.entrySet()) {
            state = entry.getValue();
            logger.debugf("Inspecting session: %s", (Object)state);
            int sessionExpiryInterval = this.getSessionExpiryInterval(state);
            if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (long)(sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
                toRemove.add(entry.getKey());
            }
            if (!state.isWill() || state.isAttached() || !state.isFailed() || state.isWillSent() || state.getWillDelayInterval() <= 0L || state.getDisconnectedTime() + state.getWillDelayInterval() * 1000L >= System.currentTimeMillis()) continue;
            state.getSession().sendWillMessage();
        }
        for (String key : toRemove) {
            logger.debugf("Removing state for session: %s", (Object)key);
            state = this.sessionStates.remove(key);
            if (state == null || !state.isWill() || state.isAttached() || !state.isFailed() || state.isWillSent()) continue;
            state.getSession().sendWillMessage();
        }
    }

    private int getSessionExpiryInterval(MQTTSessionState state) {
        int sessionExpiryInterval = state.getClientSessionExpiryInterval() == 0 ? this.getDefaultMqttSessionExpiryInterval() : state.getClientSessionExpiryInterval();
        return sessionExpiryInterval;
    }

    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
        try {
            MQTTConnection mqttConnection = new MQTTConnection(connection);
            ConnectionEntry entry = new ConnectionEntry((RemotingConnection)mqttConnection, null, System.currentTimeMillis(), this.getServerKeepAlive() == -1 || this.getServerKeepAlive() == 0 ? -1L : (long)this.getServerKeepAlive() * 1500L);
            NettyServerConnection nettyConnection = (NettyServerConnection)connection;
            MQTTProtocolHandler protocolHandler = (MQTTProtocolHandler)nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class);
            protocolHandler.setConnection(mqttConnection, entry);
            return entry;
        }
        catch (Exception e) {
            this.log.error(e);
            return null;
        }
    }

    public boolean acceptsNoHandshake() {
        return false;
    }

    public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
        connection.bufferReceived(connection.getID(), buffer);
    }

    public void addChannelHandlers(ChannelPipeline pipeline) {
        pipeline.addLast(new ChannelHandler[]{MqttEncoder.INSTANCE});
        pipeline.addLast(new ChannelHandler[]{new MqttDecoder(0xFFFFFFF)});
        pipeline.addLast(new ChannelHandler[]{new MQTTProtocolHandler(this.server, this)});
    }

    public boolean isProtocol(byte[] array) {
        ByteBuf buf = Unpooled.wrappedBuffer((byte[])array);
        if (this.readByte(buf) != 16 || !this.validateRemainingLength(buf) || this.readByte(buf) != 0) {
            return false;
        }
        byte b = this.readByte(buf);
        return b != 4 && b != 6 || this.readByte(buf) == 77 && this.readByte(buf) == 81;
    }

    byte readByte(ByteBuf buf) {
        byte b = buf.readByte();
        if (this.log.isTraceEnabled()) {
            this.log.trace(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0'));
        }
        return b;
    }

    private boolean validateRemainingLength(ByteBuf buffer) {
        int msb = -128;
        for (int i = 0; i < 4; i = (int)((byte)(i + 1))) {
            if ((this.readByte(buffer) & msb) == msb) continue;
            return true;
        }
        return false;
    }

    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }

    public List<String> websocketSubprotocolIdentifiers() {
        return websocketRegistryNames;
    }

    public MQTTRoutingHandler getRoutingHandler() {
        return this.routingHandler;
    }

    public String invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
        return super.invokeInterceptors(this.incomingInterceptors, (Object)mqttMessage, (RemotingConnection)connection);
    }

    public String invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
        return super.invokeInterceptors(this.outgoingInterceptors, (Object)mqttMessage, (RemotingConnection)connection);
    }

    public boolean isClientConnected(String clientId, MQTTConnection connection) {
        MQTTConnection connectedConn = this.connectedClients.get(clientId);
        if (connectedConn != null) {
            return connectedConn.equals(connection);
        }
        return false;
    }

    public void removeConnectedClient(String clientId) {
        this.connectedClients.remove(clientId);
    }

    public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
        return this.connectedClients.put(clientId, connection);
    }

    public MQTTSessionState getSessionState(String clientId) {
        return this.sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
    }

    public MQTTSessionState removeSessionState(String clientId) {
        if (clientId == null) {
            return null;
        }
        return this.sessionStates.remove(clientId);
    }

    public Map<String, MQTTSessionState> getSessionStates() {
        return new HashMap<String, MQTTSessionState>(this.sessionStates);
    }

    public Map<String, MQTTConnection> getConnectedClients() {
        return this.connectedClients;
    }
}

