/*
 * Decompiled with CFR 0.152.
 */
package stream.net;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import stream.io.AbstractStream;
import stream.io.SourceURL;

public class UDPStream
extends AbstractStream
implements Runnable {
    static Logger log = LoggerFactory.getLogger(UDPStream.class);
    protected String address = "0.0.0.0";
    protected Integer port;
    protected DatagramSocket socket;
    protected boolean running = false;
    protected Integer packetSize = 1024;
    protected Integer backlog = 100;
    protected final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue();
    protected Thread t;
    protected String id;

    public UDPStream() {
        super((SourceURL)null);
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    @Override
    public void init() throws Exception {
        this.socket = new DatagramSocket(this.port);
        if (this.running && this.t.isAlive()) {
            log.error("UDP-Stream {} already running.", (Object)this);
            return;
        }
        this.t = new Thread(this);
        this.t.start();
    }

    @Override
    public Data read() throws Exception {
        Data item = null;
        while (item == null) {
            try {
                item = this.queue.take();
            }
            catch (InterruptedException ie) {
                if (!this.socket.isClosed()) continue;
                return null;
            }
        }
        Data datum = DataFactory.create();
        datum.putAll(item);
        return datum;
    }

    @Override
    public void close() throws Exception {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.running = true;
        while (this.running) {
            try {
                byte[] buf = new byte[this.packetSize.intValue()];
                DatagramPacket packet = new DatagramPacket(buf, this.packetSize);
                this.socket.receive(packet);
                Data item = DataFactory.create();
                int off = packet.getOffset();
                int len = packet.getLength() - off;
                byte[] data = new byte[len];
                System.arraycopy(packet.getData(), off, data, 0, len);
                item.put("udp:data", data);
                item.put("udp:source", packet.getAddress().getHostAddress());
                item.put("udp:port", packet.getPort());
                item.put("udp:size", len);
                LinkedBlockingQueue<Data> linkedBlockingQueue = this.queue;
                synchronized (linkedBlockingQueue) {
                    if (!this.queue.isEmpty() && this.queue.remainingCapacity() < 1) {
                        this.queue.remove();
                    }
                    this.queue.put(item);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String getAddress() {
        return this.address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Integer getPort() {
        return this.port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Integer getBacklog() {
        return this.backlog;
    }

    public void setBacklog(Integer backlog) {
        this.backlog = backlog;
    }

    @Override
    public Data readNext() throws Exception {
        return this.queue.take();
    }
}

