/*
 * Decompiled with CFR 0.152.
 */
package net.solarnetwork.common.mqtt.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import net.solarnetwork.common.mqtt.BaseMqttConnection;
import net.solarnetwork.common.mqtt.BasicMqttConnectionConfig;
import net.solarnetwork.common.mqtt.MqttBasicCount;
import net.solarnetwork.common.mqtt.MqttConnectReturnCode;
import net.solarnetwork.common.mqtt.MqttConnection;
import net.solarnetwork.common.mqtt.MqttConnectionConfig;
import net.solarnetwork.common.mqtt.MqttConnectionObserver;
import net.solarnetwork.common.mqtt.MqttMessage;
import net.solarnetwork.common.mqtt.MqttMessageHandler;
import net.solarnetwork.common.mqtt.MqttProperty;
import net.solarnetwork.common.mqtt.MqttPropertyType;
import net.solarnetwork.common.mqtt.MqttQos;
import net.solarnetwork.common.mqtt.MqttUtils;
import net.solarnetwork.common.mqtt.MqttVersion;
import net.solarnetwork.common.mqtt.WireLoggingSupport;
import net.solarnetwork.common.mqtt.netty.NettyMqttUtils;
import net.solarnetwork.common.mqtt.netty.client.ChannelClosedException;
import net.solarnetwork.common.mqtt.netty.client.MqttClient;
import net.solarnetwork.common.mqtt.netty.client.MqttClientCallback;
import net.solarnetwork.common.mqtt.netty.client.MqttClientConfig;
import net.solarnetwork.common.mqtt.netty.client.MqttConnectResult;
import net.solarnetwork.common.mqtt.netty.client.MqttLastWill;
import net.solarnetwork.service.CertificateException;
import net.solarnetwork.service.SSLService;
import net.solarnetwork.util.StatTracker;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

public class NettyMqttConnection
extends BaseMqttConnection
implements MqttMessageHandler,
MqttClientCallback,
WireLoggingSupport {
    public static final int DEFAULT_IO_THREAD_COUNT = 2;
    public static final boolean DEFAULT_WIRE_LOGGING = false;
    private int ioThreadCount = 2;
    private boolean wireLogging = false;
    private volatile MqttClient client;

    public NettyMqttConnection(Executor executor, TaskScheduler scheduler) {
        this(executor, scheduler, (MqttConnectionConfig)new BasicMqttConnectionConfig());
    }

    public NettyMqttConnection(Executor executor, TaskScheduler scheduler, MqttConnectionConfig connectionConfig) {
        super(executor, scheduler, connectionConfig);
    }

    public String toString() {
        StringBuilder buf = new StringBuilder("NettyMqttConnection{");
        BasicMqttConnectionConfig config = this.getConnectionConfig();
        if (config != null) {
            buf.append("uid=");
            buf.append(config.getUid());
            buf.append(", clientId=");
            buf.append(config.getClientId());
            if (config.getUsername() != null) {
                buf.append(", username=");
                buf.append(config.getUsername());
            }
            buf.append(", uri=");
            buf.append(config.getServerUri());
        }
        buf.append('}');
        return buf.toString();
    }

    private MqttConnectReturnCode returnCode(io.netty.handler.codec.mqtt.MqttConnectReturnCode other) {
        if (other == null) {
            return null;
        }
        switch (other) {
            case CONNECTION_ACCEPTED: {
                return MqttConnectReturnCode.Accepted;
            }
            case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: {
                return MqttConnectReturnCode.BadCredentials;
            }
            case CONNECTION_REFUSED_IDENTIFIER_REJECTED: {
                return MqttConnectReturnCode.ClientIdRejected;
            }
            case CONNECTION_REFUSED_NOT_AUTHORIZED: {
                return MqttConnectReturnCode.NotAuthorized;
            }
            case CONNECTION_REFUSED_SERVER_UNAVAILABLE: {
                return MqttConnectReturnCode.ServerUnavailable;
            }
            case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: {
                return MqttConnectReturnCode.UnacceptableProtocolVersion;
            }
        }
        return null;
    }

    protected Runnable createConnectScheduledTask(CompletableFuture<MqttConnectReturnCode> future) {
        return new ConnectScheduledTask(future);
    }

    private MqttClientConfig createClientConfig(MqttConnectionConfig connConfig) {
        if (connConfig == null) {
            return null;
        }
        if (connConfig.getServerUri() == null || connConfig.getClientId() == null || connConfig.getClientId().isEmpty()) {
            this.log.info("Server URI and/or client ID not configured, cannot connect to MQTT server.");
            return null;
        }
        MqttClientConfig config = connConfig.isUseSsl() ? new MqttClientConfig(this.createSslContext(connConfig.getSslService())) : new MqttClientConfig();
        config.setCleanSession(connConfig.isCleanSession());
        config.setClientId(connConfig.getClientId());
        if (connConfig.getLastWill() != null) {
            MqttMessage msg = connConfig.getLastWill();
            MqttLastWill lwt = new MqttLastWill(msg.getTopic(), new String(msg.getPayload(), Charset.forName("UTF-8")), msg.isRetained(), MqttQoS.valueOf((int)msg.getQosLevel().getValue()));
            config.setLastWill(lwt);
        }
        config.setMaxBytesInMessage(connConfig.getMaximumMessageSize());
        config.setPassword(connConfig.getPassword());
        switch (connConfig.getVersion()) {
            case Mqtt31: {
                config.setProtocolVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_3_1);
                break;
            }
            case Mqtt5: {
                config.setProtocolVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_5);
                MqttProperty maxTopicAliases = connConfig.getProperty(MqttPropertyType.TOPIC_ALIAS_MAXIMUM);
                if (maxTopicAliases == null || maxTopicAliases.getValue() == null) break;
                config.setMaximumTopicAliases((Integer)maxTopicAliases.getValue());
                break;
            }
            default: {
                config.setProtocolVersion(io.netty.handler.codec.mqtt.MqttVersion.MQTT_3_1_1);
            }
        }
        config.setReconnect(false);
        config.setReconnectDelay(connConfig.getReconnectDelaySeconds());
        config.setTimeoutSeconds(connConfig.getKeepAliveSeconds());
        config.setReadTimeoutSeconds(connConfig.getReadTimeoutSeconds());
        config.setWriteTimeoutSeconds(connConfig.getWriteTimeoutSeconds());
        config.setUsername(connConfig.getUsername());
        return config;
    }

    private SslContext createSslContext(SSLService sslService) {
        try {
            SslContextBuilder builder = SslContextBuilder.forClient();
            if (sslService != null) {
                KeyManagerFactory kmf;
                TrustManagerFactory tmf = sslService.getTrustManagerFactory();
                if (tmf != null) {
                    builder.trustManager(tmf);
                }
                if ((kmf = sslService.getKeyManagerFactory()) != null) {
                    builder.keyManager(kmf);
                }
            }
            return builder.build();
        }
        catch (SSLException e) {
            throw new CertificateException("Error configuring SSL for MQTT connection: " + e.getMessage(), (Throwable)e);
        }
    }

    private java.util.concurrent.Future<?> closeClient(MqttClient c) {
        CompletableFuture<Object> result = new CompletableFuture<Object>();
        EventLoopGroup g = c.getEventLoop();
        if (g != null) {
            g.execute(() -> {
                try {
                    c.disconnect().get(this.connectionConfig.getConnectTimeoutSeconds(), TimeUnit.SECONDS);
                    result.complete(null);
                }
                catch (Exception e) {
                    result.completeExceptionally(e);
                }
                finally {
                    if (g != null) {
                        g.shutdownGracefully();
                    }
                }
            });
        } else {
            result.complete(null);
        }
        return result;
    }

    protected synchronized java.util.concurrent.Future<?> closeConnection() {
        MqttClient c = this.client;
        if (c != null) {
            try {
                java.util.concurrent.Future<?> future = this.closeClient(c);
                return future;
            }
            finally {
                this.client = null;
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void connectionLost(Throwable cause) {
        MqttConnectionObserver observer;
        String msg = cause instanceof ChannelClosedException ? "closed" : (cause != null ? cause.toString() : "unknown cause");
        this.log.warn("Connection lost to MQTT server {}: {}", (Object)this.connectionConfig.getServerUri(), (Object)msg);
        StatTracker s = this.connectionConfig.getStats();
        if (s != null) {
            s.increment((Enum)MqttBasicCount.ConnectionLost);
        }
        if ((observer = this.connectionObserver) != null) {
            this.executor.execute(new ConnectionLostTask(cause, observer));
        }
        if (!this.isClosed() && this.connectionConfig.isReconnect()) {
            this.log.info("Resetting connection to MQTT server {} to schedule reconnect", (Object)this.connectionConfig.getServerUri());
            this.reconfigure();
        }
    }

    @Override
    public void onSuccessfulReconnect() {
        MqttConnectionObserver observer;
        this.log.warn("Reconnected to MQTT server {}", (Object)this.connectionConfig.getServerUri());
        StatTracker s = this.connectionConfig.getStats();
        if (s != null) {
            s.increment((Enum)MqttBasicCount.ConnectionSuccess);
        }
        if ((observer = this.connectionObserver) != null) {
            this.executor.execute(new ConnectionEstablishedTask(true, observer));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isEstablished() {
        MqttClient c;
        Object object = this;
        synchronized (object) {
            if (this.isClosed()) {
                return false;
            }
            c = this.client;
        }
        if (c == null) {
            return false;
        }
        object = c;
        synchronized (object) {
            return c.isConnected();
        }
    }

    public void onMqttMessage(MqttMessage message) {
        MqttMessageHandler handler;
        StatTracker s = this.connectionConfig.getStats();
        if (s != null && message != null) {
            s.increment((Enum)MqttBasicCount.MessagesReceived);
            byte[] payload = message.getPayload();
            if (payload != null && payload.length > 0) {
                s.add((Enum)MqttBasicCount.PayloadBytesReceived, (long)payload.length);
            }
        }
        if ((handler = this.messageHandler) != null) {
            this.executor.execute(new MessageHandlerTask(message, handler));
        }
    }

    public java.util.concurrent.Future<?> publish(MqttMessage message) {
        if (message == null) {
            return CompletableFuture.completedFuture(null);
        }
        try {
            MqttUtils.validateTopicName((String)message.getTopic(), (MqttVersion)this.getConnectionConfig().getVersion());
        }
        catch (IllegalArgumentException e) {
            CompletableFuture f = new CompletableFuture();
            f.completeExceptionally(e);
            return f;
        }
        MqttClient c = this.client;
        if (c == null) {
            CompletableFuture f = new CompletableFuture();
            f.completeExceptionally(new IOException("Not connected to MQTT server."));
            return f;
        }
        final byte[] payload = message.getPayload();
        Future<Void> f = c.publish(message.getTopic(), Unpooled.wrappedBuffer((byte[])payload), NettyMqttUtils.qos(message.getQosLevel()), message.isRetained(), message.getProperties());
        final StatTracker s = this.connectionConfig.getStats();
        if (s != null) {
            f.addListener((GenericFutureListener)new GenericFutureListener<Future<? super Void>>(){

                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        s.increment((Enum)MqttBasicCount.MessagesDelivered);
                        if (payload != null && payload.length > 0) {
                            s.add((Enum)MqttBasicCount.PayloadBytesDelivered, (long)payload.length);
                        }
                    } else {
                        s.increment((Enum)MqttBasicCount.MessagesDeliveredFail);
                    }
                }
            });
        }
        return f;
    }

    public java.util.concurrent.Future<?> subscribe(String topic, MqttQos qosLevel, MqttMessageHandler handler) {
        MqttClient c = this.client;
        if (c == null) {
            CompletableFuture f = new CompletableFuture();
            f.completeExceptionally(new IOException("Not connected to MQTT server."));
            return f;
        }
        return c.on(topic, (MqttMessageHandler)(handler != null ? new StatsMessageHandler(handler) : this), NettyMqttUtils.qos(qosLevel));
    }

    public java.util.concurrent.Future<?> unsubscribe(String topic, MqttMessageHandler handler) {
        MqttClient c = this.client;
        if (c == null) {
            CompletableFuture f = new CompletableFuture();
            f.completeExceptionally(new IOException("Not connected to MQTT server."));
            return f;
        }
        return c.off(topic, (MqttMessageHandler)(handler != null ? new StatsMessageHandler(handler) : this));
    }

    public int getIoThreadCount() {
        return this.ioThreadCount;
    }

    public void setIoThreadCount(int ioThreadCount) {
        if (ioThreadCount < 0) {
            throw new IllegalArgumentException("The ioThreadCount value must be >= 0");
        }
        this.ioThreadCount = ioThreadCount;
    }

    public boolean isWireLoggingEnabled() {
        return this.wireLogging;
    }

    public void setWireLoggingEnabled(boolean wireLogging) {
        this.wireLogging = wireLogging;
    }

    private final class ConnectScheduledTask
    implements Runnable {
        private final ConnectTask task;

        private ConnectScheduledTask(CompletableFuture<MqttConnectReturnCode> future) {
            this.task = new ConnectTask(future, this);
        }

        @Override
        public void run() {
            NettyMqttConnection.this.executor.execute(this.task);
        }
    }

    private final class ConnectionLostTask
    implements Runnable {
        private final Throwable cause;
        private final MqttConnectionObserver observer;

        private ConnectionLostTask(Throwable cause, MqttConnectionObserver observer) {
            this.cause = cause;
            this.observer = observer;
        }

        @Override
        public void run() {
            try {
                this.observer.onMqttServerConnectionLost((MqttConnection)NettyMqttConnection.this, NettyMqttConnection.this.connectionConfig.isReconnect(), this.cause);
            }
            catch (Throwable t) {
                Throwable root = t;
                while (root.getCause() != null) {
                    root = root.getCause();
                }
                NettyMqttConnection.this.log.error("Unhandled {} exception on connection loss observer {}", new Object[]{root.getClass().getSimpleName(), this.observer, t});
            }
        }
    }

    private final class ConnectionEstablishedTask
    implements Runnable {
        private final boolean reconnected;
        private final MqttConnectionObserver observer;

        private ConnectionEstablishedTask(boolean reconnected, MqttConnectionObserver observer) {
            this.reconnected = reconnected;
            this.observer = observer;
        }

        @Override
        public void run() {
            try {
                this.observer.onMqttServerConnectionEstablished((MqttConnection)NettyMqttConnection.this, this.reconnected);
            }
            catch (Throwable t) {
                Throwable root = t;
                while (root.getCause() != null) {
                    root = root.getCause();
                }
                NettyMqttConnection.this.log.error("Unhandled {} exception on connection establishment observer {}", new Object[]{root.getClass().getSimpleName(), this.observer, t});
            }
        }
    }

    private final class MessageHandlerTask
    implements Runnable {
        private final MqttMessage message;
        private final MqttMessageHandler handler;

        private MessageHandlerTask(MqttMessage message, MqttMessageHandler handler) {
            this.message = message;
            this.handler = handler;
        }

        @Override
        public void run() {
            try {
                this.handler.onMqttMessage(this.message);
            }
            catch (Exception e) {
                Throwable root = e;
                while (root.getCause() != null) {
                    root = root.getCause();
                }
                NettyMqttConnection.this.log.error("Unhandled exception in MQTT message handler {} on topic {}: {}", new Object[]{this.handler, this.message.getTopic(), root.getMessage(), e});
            }
        }
    }

    private final class StatsMessageHandler
    implements MqttMessageHandler {
        private final MqttMessageHandler delegate;

        private StatsMessageHandler(MqttMessageHandler delegate) {
            this.delegate = delegate;
        }

        public void onMqttMessage(MqttMessage message) {
            StatTracker s = NettyMqttConnection.this.connectionConfig.getStats();
            if (s != null) {
                s.increment((Enum)MqttBasicCount.MessagesReceived);
                byte[] payload = message.getPayload();
                if (payload != null && payload.length > 0) {
                    s.add((Enum)MqttBasicCount.PayloadBytesReceived, (long)payload.length);
                }
            }
            NettyMqttConnection.this.executor.execute(new MessageHandlerTask(message, this.delegate));
        }

        public int hashCode() {
            return this.delegate.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj instanceof StatsMessageHandler) {
                obj = ((StatsMessageHandler)obj).delegate;
            }
            return this.delegate.equals(obj);
        }
    }

    private final class ConnectTask
    implements Runnable {
        private final CompletableFuture<MqttConnectReturnCode> connectFuture;
        private final ConnectScheduledTask scheduledTask;
        private long reconnectDelay = 0L;

        private ConnectTask(CompletableFuture<MqttConnectReturnCode> connectFuture, ConnectScheduledTask scheduledTask) {
            this.connectFuture = connectFuture;
            this.scheduledTask = scheduledTask;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            NettyMqttConnection nettyMqttConnection = NettyMqttConnection.this;
            synchronized (nettyMqttConnection) {
                if (NettyMqttConnection.this.isClosed() || this.connectFuture != NettyMqttConnection.this.connectFuture()) {
                    this.connectFuture.completeExceptionally(new RuntimeException("Connect cancelled."));
                    return;
                }
            }
            if (this.reconnectDelay < (long)NettyMqttConnection.this.connectionConfig.getReconnectDelaySeconds() * 30000L) {
                int step = Math.max(1, NettyMqttConnection.this.connectionConfig.getReconnectDelaySeconds() / 2);
                this.reconnectDelay += (long)step * 1000L;
            }
            Exception t = null;
            MqttConnectResult r = null;
            MqttClientConfig config = null;
            try {
                config = NettyMqttConnection.this.createClientConfig((MqttConnectionConfig)NettyMqttConnection.this.connectionConfig);
            }
            catch (RuntimeException e) {
                NettyMqttConnection.this.log.warn("Invalid {} MQTT configuration: {}", new Object[]{NettyMqttConnection.this.getUid(), e.toString(), e});
                t = e;
            }
            if (config != null) {
                StatTracker s;
                block20: {
                    MqttClient client = null;
                    s = NettyMqttConnection.this.connectionConfig.getStats();
                    try {
                        client = MqttClient.create(config, NettyMqttConnection.this);
                        client.setWireLogging(NettyMqttConnection.this.wireLogging || NettyMqttConnection.this.connectionConfig.isWireLoggingEnabled());
                        client.setCallback(NettyMqttConnection.this);
                        client.setEventLoop((EventLoopGroup)new NioEventLoopGroup(NettyMqttConnection.this.ioThreadCount, (ThreadFactory)new CustomizableThreadFactory("MQTT-" + NettyMqttConnection.this.getUid() + "-")));
                        if (s != null) {
                            s.increment((Enum)MqttBasicCount.ConnectionAttempts);
                        }
                        NettyMqttConnection.this.log.info("Connecting to MQTT server {}...", (Object)NettyMqttConnection.this.connectionConfig.getServerUri());
                        Future<MqttConnectResult> f = client.connect(NettyMqttConnection.this.connectionConfig.getHost(), NettyMqttConnection.this.connectionConfig.getPort());
                        r = (MqttConnectResult)f.get(NettyMqttConnection.this.connectionConfig.getConnectTimeoutSeconds(), TimeUnit.SECONDS);
                        if (r.isSuccess()) {
                            NettyMqttConnection.this.log.info("Connected to MQTT server {}", (Object)NettyMqttConnection.this.connectionConfig.getServerUri());
                            this.connectComplete(client, r, null);
                            return;
                        }
                        t = new RuntimeException("Server refused connection: " + r.getReturnCode());
                    }
                    catch (Exception e) {
                        t = e;
                        if (client == null) break block20;
                        try {
                            NettyMqttConnection.this.closeClient(client).get(NettyMqttConnection.this.connectionConfig.getConnectTimeoutSeconds(), TimeUnit.SECONDS);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
                if (s != null) {
                    s.increment((Enum)MqttBasicCount.ConnectionFail);
                }
                if (NettyMqttConnection.this.connectionConfig.isReconnect()) {
                    NettyMqttConnection.this.log.info("Failed to connect to MQTT server {} ({}), will try again in {}s", new Object[]{NettyMqttConnection.this.connectionConfig.getServerUri(), t instanceof TimeoutException ? "timeout" : t.getMessage(), String.format("%.01f", (double)this.reconnectDelay / 1000.0)});
                } else {
                    NettyMqttConnection.this.log.info("Failed to connect to MQTT server {} (), will not try again.", (Object)NettyMqttConnection.this.connectionConfig.getServerUri(), (Object)(t instanceof TimeoutException ? "timeout" : t.getMessage()));
                }
            } else {
                NettyMqttConnection.this.log.info("{} MQTT configuration incomplete, will not connect.", (Object)NettyMqttConnection.this.getUid());
            }
            if (NettyMqttConnection.this.connectionConfig.isReconnect() && config != null) {
                NettyMqttConnection.this.scheduler.schedule((Runnable)this.scheduledTask, new Date(System.currentTimeMillis() + this.reconnectDelay));
            } else {
                this.connectComplete(null, r, t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void connectComplete(MqttClient client, MqttConnectResult result, Throwable t) {
            NettyMqttConnection nettyMqttConnection = NettyMqttConnection.this;
            synchronized (nettyMqttConnection) {
                NettyMqttConnection.this.client = client;
                if (this.connectFuture != null) {
                    if (t != null) {
                        this.connectFuture.completeExceptionally(t);
                    } else {
                        MqttConnectionObserver observer;
                        MqttConnectReturnCode code = result != null ? NettyMqttConnection.this.returnCode(result.getReturnCode()) : null;
                        this.connectFuture.complete(code);
                        StatTracker s = NettyMqttConnection.this.connectionConfig.getStats();
                        if (s != null) {
                            s.increment((Enum)MqttBasicCount.ConnectionSuccess);
                        }
                        if ((observer = NettyMqttConnection.this.connectionObserver) != null) {
                            NettyMqttConnection.this.executor.execute(new ConnectionEstablishedTask(false, observer));
                        }
                    }
                }
            }
        }
    }
}

