/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.logging.LoggingHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetServerBase;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.impl.MqttConnection;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;

public class MqttServerImpl
extends NetServerBase<MqttConnection>
implements MqttServer {
    private Handler<MqttEndpoint> endpointHandler;

    public MqttServerImpl(Vertx vertx, MqttServerOptions options) {
        super((VertxInternal)vertx, (NetServerOptions)options);
    }

    @Override
    public MqttServer listen() {
        return this.listen((Handler<AsyncResult<MqttServer>>)((Handler)ar -> {}));
    }

    @Override
    public MqttServer listen(int port, String host) {
        return this.listen(port, host, (Handler<AsyncResult<MqttServer>>)((Handler)ar -> {}));
    }

    @Override
    public MqttServer listen(int port) {
        return this.listen(port, (Handler<AsyncResult<MqttServer>>)((Handler)ar -> {}));
    }

    @Override
    public MqttServer listen(int port, Handler<AsyncResult<MqttServer>> listenHandler) {
        return this.listen(port, this.options.getHost(), listenHandler);
    }

    @Override
    public MqttServer listen(Handler<AsyncResult<MqttServer>> listenHandler) {
        return this.listen(this.options.getPort(), listenHandler);
    }

    @Override
    public MqttServer listen(int port, String host, Handler<AsyncResult<MqttServer>> listenHandler) {
        Handler<MqttEndpoint> handler = this.endpointHandler;
        Handler mqttConnectionHandler = c -> c.setEndpointHandler(handler);
        this.listen(mqttConnectionHandler, port, host, ar -> listenHandler.handle((Object)ar.map((Object)this)));
        return this;
    }

    @Override
    public synchronized MqttServer endpointHandler(Handler<MqttEndpoint> handler) {
        this.endpointHandler = handler;
        return this;
    }

    protected void handleMsgReceived(MqttConnection conn, Object msg) {
        conn.handleMessage(msg);
    }

    protected MqttConnection createConnection(VertxInternal vertx, Channel channel, ContextImpl context, SSLHelper helper, TCPMetrics metrics) {
        return new MqttConnection(vertx, channel, vertx.getOrCreateContext(), metrics);
    }

    protected void initChannel(ChannelPipeline pipeline) {
        if (this.sslHelper.isSSL()) {
            pipeline.addLast("ssl", (ChannelHandler)this.sslHelper.createSslHandler(this.vertx));
        }
        if (this.logEnabled) {
            pipeline.addLast("logging", (ChannelHandler)new LoggingHandler());
        }
        pipeline.addLast("mqttEncoder", (ChannelHandler)MqttEncoder.INSTANCE);
        pipeline.addLast("mqttDecoder", (ChannelHandler)new MqttDecoder());
    }

    protected Object safeObject(Object msg, ByteBufAllocator allocator) {
        MqttMessage mqttMessage;
        DecoderResult result;
        if (msg instanceof MqttMessage && (result = (mqttMessage = (MqttMessage)msg).decoderResult()).isSuccess() && result.isFinished()) {
            switch (mqttMessage.fixedHeader().messageType()) {
                case SUBSCRIBE: {
                    MqttSubscribeMessage subscribe = (MqttSubscribeMessage)mqttMessage;
                    return io.vertx.mqtt.messages.MqttSubscribeMessage.create(subscribe.variableHeader().messageId(), subscribe.payload().topicSubscriptions());
                }
                case UNSUBSCRIBE: {
                    io.netty.handler.codec.mqtt.MqttUnsubscribeMessage unsubscribe = (io.netty.handler.codec.mqtt.MqttUnsubscribeMessage)mqttMessage;
                    return MqttUnsubscribeMessage.create(unsubscribe.variableHeader().messageId(), unsubscribe.payload().topics());
                }
                case PUBLISH: {
                    MqttPublishMessage publish = (MqttPublishMessage)mqttMessage;
                    ByteBuf newBuf = VertxHandler.safeBuffer((ByteBuf)publish.payload(), (ByteBufAllocator)allocator);
                    return io.vertx.mqtt.messages.MqttPublishMessage.create(publish.variableHeader().messageId(), publish.fixedHeader().qosLevel(), publish.fixedHeader().isDup(), publish.fixedHeader().isRetain(), publish.variableHeader().topicName(), newBuf);
                }
            }
        }
        return msg;
    }
}

