/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime.rpc;

import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.runtime.rpc.ContainerAnnouncement;

public class Discovery
extends Thread {
    static Logger log = LoggerFactory.getLogger(Discovery.class);
    boolean running = true;
    final Map<String, Long> alive = new LinkedHashMap<String, Long>();
    final Map<String, ContainerAnnouncement> containers = new LinkedHashMap<String, ContainerAnnouncement>();
    final DatagramSocket discovery;
    Long interval = 1000L;
    int count = 1;
    int announcementPort = 9200;

    public Discovery() throws Exception {
        this(9200);
    }

    public Discovery(int announcementPort) throws Exception {
        this.announcementPort = announcementPort;
        this.discovery = new DatagramSocket(0);
        this.discovery.setBroadcast(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ContainerAnnouncement discover() throws Exception {
        this.containers.clear();
        this.alive.clear();
        int timeout = 100;
        try {
            timeout = Integer.parseInt(System.getProperty("stream.container.timeout", "250"));
        }
        catch (Exception e) {
            timeout = 100;
        }
        log.info("Using connection timeout of {} ms", (Object)timeout);
        DatagramPacket query = new DatagramPacket(ContainerAnnouncement.CONTAINER_QUERY, ContainerAnnouncement.CONTAINER_QUERY.length);
        query.setAddress(InetAddress.getByName("255.255.255.255"));
        query.setPort(this.announcementPort);
        log.debug("Sending broadcast-query to {}:{}", (Object)query.getAddress(), (Object)query.getPort());
        this.discovery.send(query);
        log.debug("query sent...");
        int i = 0;
        ArrayList<ContainerAnnouncement> discovered = new ArrayList<ContainerAnnouncement>();
        while (i++ < 5) {
            try {
                DatagramPacket p = new DatagramPacket(new byte[1024], 1024);
                this.discovery.setSoTimeout(100);
                log.debug("receiving...");
                this.discovery.receive(p);
                if (p.getData() != null) {
                    ContainerAnnouncement announcement = new ContainerAnnouncement(p.getData());
                    log.debug("Discovered container {} at " + announcement.getProtocol() + "://" + announcement.getHost() + ":" + announcement.getPort(), (Object)announcement.getName());
                    try {
                        Socket sock = new Socket();
                        log.debug("Creating socket-address...");
                        InetSocketAddress addr = new InetSocketAddress(announcement.getHost(), (int)announcement.getPort());
                        log.debug("Checking connection to {}", (Object)addr);
                        sock.connect(addr, timeout);
                        if (!sock.isConnected()) continue;
                        log.debug("Test-Connection succeeded.");
                        sock.close();
                        log.debug("Test-connection closed.");
                        discovered.add(announcement);
                        Map<String, Serializable> map = this.containers;
                        synchronized (map) {
                            this.containers.put(announcement.getName(), announcement);
                        }
                        map = this.alive;
                        synchronized (map) {
                            this.alive.put(announcement.toString(), System.currentTimeMillis());
                            continue;
                        }
                    }
                    catch (SocketTimeoutException e) {
                        log.error("Cannot connect to container {}: {}", (Object)announcement, (Object)e.getMessage());
                        if (!log.isTraceEnabled()) continue;
                        e.printStackTrace();
                        continue;
                    }
                    catch (Exception ce) {
                        log.error("Found container at {}, but failed to connect: {}", (Object)announcement, (Object)ce.getMessage());
                        if (!log.isDebugEnabled()) continue;
                        ce.printStackTrace();
                        continue;
                    }
                }
                log.debug("received data-gram without data... {}", (Object)p);
            }
            catch (SocketTimeoutException ste) {
            }
            catch (Exception e) {
                log.error("Error: {}", (Object)e.getMessage());
            }
        }
        if (discovered.isEmpty()) {
            log.debug("No containers discovered!");
            return null;
        }
        log.info("Discovered containers: {}", (Object)discovered);
        return (ContainerAnnouncement)discovered.get(0);
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                DatagramPacket p = new DatagramPacket(new byte[1024], 1024);
                this.discovery.receive(p);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(this.interval);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void printContainers() {
        Map<String, Long> map = this.alive;
        synchronized (map) {
            for (String key : this.alive.keySet()) {
                log.debug("  {}   (last checked: {})", (Object)key, (Object)new Date(this.alive.get(key)));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Long> getContainers() {
        Map<String, Long> map = this.alive;
        synchronized (map) {
            return new LinkedHashMap<String, Long>(this.alive);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, String> getContainerURLs() {
        LinkedHashMap<String, String> urls = new LinkedHashMap<String, String>();
        Map<String, ContainerAnnouncement> map = this.containers;
        synchronized (map) {
            for (String key : this.containers.keySet()) {
                ContainerAnnouncement rem = this.containers.get(key);
                String url = rem.getProtocol() + "://" + rem.getHost() + ":" + rem.getPort();
                urls.put(key, url);
            }
        }
        return urls;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, ContainerAnnouncement> getAnnouncements() {
        Map<String, ContainerAnnouncement> map = this.containers;
        synchronized (map) {
            return new LinkedHashMap<String, ContainerAnnouncement>(this.containers);
        }
    }
}

