/*
 * Decompiled with CFR 0.152.
 */
package stream.net;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Processor;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.DataStream;

public class UDPStream
implements DataStream,
Runnable {
    static Logger log = LoggerFactory.getLogger(UDPStream.class);
    String protocol = "udp";
    String address = "0.0.0.0";
    Integer port;
    DatagramSocket socket;
    boolean running = false;
    Integer packetSize = 1024;
    Integer backlog = 100;
    final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue();
    final List<Processor> processors = new ArrayList<Processor>();
    Thread t;

    @Override
    public Map<String, Class<?>> getAttributes() {
        return new HashMap();
    }

    @Override
    public void init() throws Exception {
        this.socket = new DatagramSocket(this.port);
        if (this.running && this.t.isAlive()) {
            log.error("UDP-Stream {} already running.", this);
            return;
        }
        this.t = new Thread(this);
        this.t.start();
    }

    @Override
    public Data readNext() throws Exception {
        return this.readNext(DataFactory.create());
    }

    @Override
    public Data readNext(Data datum) throws Exception {
        Data item = null;
        while (item == null) {
            try {
                item = this.queue.take();
            }
            catch (InterruptedException ie) {
                if (!this.socket.isClosed()) continue;
                return null;
            }
        }
        datum.putAll(item);
        return datum;
    }

    @Override
    public List<Processor> getPreprocessors() {
        return this.processors;
    }

    @Override
    public void close() throws Exception {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.running = true;
        while (this.running) {
            try {
                byte[] buf = new byte[this.packetSize.intValue()];
                DatagramPacket packet = new DatagramPacket(buf, this.packetSize);
                this.socket.receive(packet);
                Data item = DataFactory.create();
                int off = packet.getOffset();
                int len = packet.getLength() - off;
                byte[] data = new byte[len];
                System.arraycopy(packet.getData(), off, data, 0, len);
                item.put("udp:data", data);
                item.put("udp:source", packet.getAddress().getHostAddress());
                item.put("udp:port", packet.getPort());
                item.put("udp:size", len);
                LinkedBlockingQueue<Data> linkedBlockingQueue = this.queue;
                synchronized (linkedBlockingQueue) {
                    if (!this.queue.isEmpty() && this.queue.remainingCapacity() < 1) {
                        this.queue.remove();
                    }
                    this.queue.put(item);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String getAddress() {
        return this.address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Integer getPort() {
        return this.port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Integer getBacklog() {
        return this.backlog;
    }

    public void setBacklog(Integer backlog) {
        this.backlog = backlog;
    }
}

