/*
 * Decompiled with CFR 0.152.
 */
package org.glassfish.jersey.examples.aggregator;

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.glassfish.jersey.SslConfigurator;
import org.glassfish.jersey.client.ChunkedInput;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.glassfish.jersey.examples.aggregator.App;
import org.glassfish.jersey.examples.aggregator.DataAggregator;
import org.glassfish.jersey.examples.aggregator.DataListener;
import org.glassfish.jersey.examples.aggregator.Message;
import org.glassfish.jersey.message.GZipEncoder;
import org.glassfish.jersey.moxy.json.MoxyJsonFeature;

public final class TwitterAggregator
implements DataAggregator {
    private static final Logger LOGGER = Logger.getLogger(TwitterAggregator.class.getName());
    private volatile boolean cancelled;
    private final String rgbColor;

    public TwitterAggregator(String rgbColor) {
        this.rgbColor = rgbColor;
    }

    @Override
    public void start(final String keywords, final DataListener msgListener) {
        this.cancelled = false;
        final LinkedBlockingQueue messages = new LinkedBlockingQueue();
        final Future<?> readerHandle = Executors.newSingleThreadExecutor().submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                SslConfigurator sslConfig = SslConfigurator.newInstance().trustStoreFile("./truststore_client").trustStorePassword("asdfgh").keyStoreFile("./keystore_client").keyPassword("asdfgh");
                Client client = ClientBuilder.newBuilder().sslContext(sslConfig.createSSLContext()).build();
                ((Client)((Client)((Client)client.property("jersey.config.client.connectTimeout", (Object)2000)).register((Object)new MoxyJsonFeature())).register((Object)HttpAuthenticationFeature.basic((String)App.getTwitterUserName(), (String)App.getTwitterUserPassword()))).register(GZipEncoder.class);
                Response response = client.target("https://stream.twitter.com/1.1/statuses/filter.json").queryParam("track", new Object[]{keywords}).request(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).header("Host", (Object)"stream.twitter.com").header("User-Agent", (Object)"Jersey/2.0").header("Accept-Encoding", (Object)"gzip").get();
                if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL) {
                    LOGGER.log(Level.WARNING, "Error connecting to Twitter Streaming API: " + response.getStatus());
                    msgListener.onError();
                    return;
                }
                msgListener.onStart();
                try {
                    ChunkedInput chunks = (ChunkedInput)response.readEntity((GenericType)new GenericType<ChunkedInput<Message>>(){});
                    try {
                        while (!Thread.interrupted()) {
                            Message message = (Message)chunks.read();
                            if (message == null) {
                                break;
                            }
                            try {
                                message.setRgbColor(TwitterAggregator.this.rgbColor);
                                System.out.println(message.toString());
                                messages.put(message);
                            }
                            catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                    finally {
                        if (chunks != null) {
                            chunks.close();
                        }
                    }
                }
                catch (Throwable t) {
                    LOGGER.log(Level.WARNING, "Reading from the Twitter stream has failed", t);
                    messages.offer(null);
                    msgListener.onError();
                }
            }
        });
        Executors.newSingleThreadExecutor().submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Client resourceClient = ClientBuilder.newClient();
                resourceClient.register((Object)new MoxyJsonFeature());
                WebTarget messageStreamResource = resourceClient.target(App.getApiUri()).path("message/stream");
                Message message = null;
                try {
                    while (!TwitterAggregator.this.cancelled && (message = (Message)messages.take()) != null) {
                        msgListener.onMessage(message);
                        Response r = messageStreamResource.request().put(Entity.json((Object)message));
                        if (r.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) continue;
                        LOGGER.warning("Unexpected PUT message response status code: " + r.getStatus());
                    }
                    if (message == null) {
                        LOGGER.info("Timed out while waiting for a message.");
                    }
                }
                catch (InterruptedException ex) {
                    LOGGER.log(Level.WARNING, "Waiting for a message has been interrupted.", ex);
                }
                finally {
                    readerHandle.cancel(true);
                    msgListener.onComplete();
                }
            }
        });
    }

    @Override
    public void stop() {
        this.cancelled = true;
    }
}

