/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.netty;

import java.util.Map;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.netty.NettySourceConfig;
import org.apache.pulsar.io.netty.server.NettyServer;

@Connector(name="netty", type=IOType.SOURCE, help="A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic", configClass=NettySourceConfig.class)
public class NettySource
extends PushSource<byte[]> {
    private NettyServer nettyServer;
    private Thread thread;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        NettySourceConfig nettySourceConfig = NettySourceConfig.load(config);
        if (nettySourceConfig.getType() == null || nettySourceConfig.getHost() == null || nettySourceConfig.getPort() <= 0) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.thread = new Thread(new PulsarServerRunnable(nettySourceConfig, this));
        this.thread.start();
    }

    public void close() throws Exception {
        this.nettyServer.shutdownGracefully();
    }

    private class PulsarServerRunnable
    implements Runnable {
        private NettySourceConfig nettySourceConfig;
        private NettySource nettySource;

        public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource2) {
            this.nettySourceConfig = nettySourceConfig;
            this.nettySource = nettySource2;
        }

        @Override
        public void run() {
            NettySource.this.nettyServer = new NettyServer.Builder().setType(NettyServer.Type.valueOf(this.nettySourceConfig.getType().toUpperCase())).setHost(this.nettySourceConfig.getHost()).setPort(this.nettySourceConfig.getPort()).setNumberOfThreads(this.nettySourceConfig.getNumberOfThreads()).setNettySource(this.nettySource).build();
            NettySource.this.nettyServer.run();
        }
    }
}

