package org.apache.pulsar.io.netty;

import java.util.Map;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.netty.server.NettyServer;

@Connector(name = "netty", type = IOType.SOURCE, help = "A simple Netty Source connector to listen for incoming messages and write to user-defined Pulsar topic", configClass = NettySourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/netty/NettySource.class */
public class NettySource extends PushSource<byte[]> {
    private NettyServer nettyServer;
    private Thread thread;

    /* loaded from: input_file:org/apache/pulsar/io/netty/NettySource$PulsarServerRunnable.class */
    private class PulsarServerRunnable implements Runnable {
        private NettySourceConfig nettySourceConfig;
        private NettySource nettySource;

        public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource) {
            this.nettySourceConfig = nettySourceConfig;
            this.nettySource = nettySource;
        }

        @Override // java.lang.Runnable
        public void run() {
            NettySource.this.nettyServer = new NettyServer.Builder().setType(NettyServer.Type.valueOf(this.nettySourceConfig.getType().toUpperCase())).setHost(this.nettySourceConfig.getHost()).setPort(this.nettySourceConfig.getPort()).setNumberOfThreads(this.nettySourceConfig.getNumberOfThreads()).setNettySource(this.nettySource).build();
            NettySource.this.nettyServer.run();
        }
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        NettySourceConfig load = NettySourceConfig.load(map);
        if (load.getType() == null || load.getHost() == null || load.getPort() <= 0) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.thread = new Thread(new PulsarServerRunnable(load, this));
        this.thread.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.nettyServer.shutdownGracefully();
    }
}
