package stream.net;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.objectweb.asm.Opcodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.Processor;
import stream.data.DataFactory;
import stream.io.DataStream;

/* loaded from: input_file:stream/net/UDPStream.class */
public class UDPStream implements DataStream, Runnable {
    static Logger log = LoggerFactory.getLogger(UDPStream.class);
    protected Integer port;
    protected DatagramSocket socket;
    protected Thread t;
    protected String id;
    protected String address = "0.0.0.0";
    protected boolean running = false;
    protected Integer packetSize = Integer.valueOf(Opcodes.ACC_ABSTRACT);
    protected Integer backlog = 100;
    protected final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>();
    protected final List<Processor> processors = new ArrayList();

    @Override // stream.io.DataStream
    public String getId() {
        return this.id;
    }

    @Override // stream.io.DataStream
    public void setId(String str) {
        this.id = str;
    }

    @Override // stream.io.DataStream
    public Map<String, Class<?>> getAttributes() {
        return new HashMap();
    }

    @Override // stream.io.DataStream
    public void init() throws Exception {
        this.socket = new DatagramSocket(this.port.intValue());
        if (this.running && this.t.isAlive()) {
            log.error("UDP-Stream {} already running.", this);
        } else {
            this.t = new Thread(this);
            this.t.start();
        }
    }

    @Override // stream.io.DataStream
    public Data readNext() throws Exception {
        return readNext(DataFactory.create());
    }

    @Override // stream.io.DataStream
    public Data readNext(Data data) throws Exception {
        Data data2 = null;
        while (data2 == null) {
            try {
                data2 = this.queue.take();
            } catch (InterruptedException e) {
                if (this.socket.isClosed()) {
                    return null;
                }
            }
        }
        data.putAll(data2);
        return data;
    }

    @Override // stream.io.DataStream
    public List<Processor> getPreprocessors() {
        return this.processors;
    }

    @Override // stream.io.DataStream
    public void close() throws Exception {
        this.running = false;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        while (this.running) {
            try {
                DatagramPacket datagramPacket = new DatagramPacket(new byte[this.packetSize.intValue()], this.packetSize.intValue());
                this.socket.receive(datagramPacket);
                Data create = DataFactory.create();
                int offset = datagramPacket.getOffset();
                int length = datagramPacket.getLength() - offset;
                byte[] bArr = new byte[length];
                System.arraycopy(datagramPacket.getData(), offset, bArr, 0, length);
                create.put("udp:data", bArr);
                create.put("udp:source", datagramPacket.getAddress().getHostAddress());
                create.put("udp:port", Integer.valueOf(datagramPacket.getPort()));
                create.put("udp:size", Integer.valueOf(length));
                synchronized (this.queue) {
                    if (!this.queue.isEmpty() && this.queue.remainingCapacity() < 1) {
                        this.queue.remove();
                    }
                    this.queue.put(create);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String getAddress() {
        return this.address;
    }

    public void setAddress(String str) {
        this.address = str;
    }

    public Integer getPort() {
        return this.port;
    }

    public void setPort(Integer num) {
        this.port = num;
    }

    public Integer getBacklog() {
        return this.backlog;
    }

    public void setBacklog(Integer num) {
        this.backlog = num;
    }
}
