/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.hadoop.flink.sample;

import java.io.Serializable;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.infinispan.hadoop.InfinispanInputFormat;

public class WordFrequency {
    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("Usage: WordFrequency <ispn-server-ip>");
            System.exit(1);
        }
        Configuration configuration = new Configuration();
        configuration.set("hadoop.ispn.input.remote.cache.servers", args[0]);
        configuration.set("hadoop.ispn.input.cache.name", "phrases");
        Job job = Job.getInstance((Configuration)configuration, (String)"Infinispan Integration");
        InfinispanInputFormat infinispanInputFormat = new InfinispanInputFormat();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource infinispanDS = env.createInput((org.apache.flink.api.common.io.InputFormat)HadoopInputs.createHadoopInput((InputFormat)infinispanInputFormat, Integer.class, String.class, (Job)job));
        long count = infinispanDS.count();
        SingleInputUdfOperator values = infinispanDS.map((MapFunction & Serializable)entry -> (String)entry.f1).returns(String.class);
        FlatMapOperator lengthsCount = values.flatMap((FlatMapFunction)new FlatMapFunction<String, Tuple2<Integer, Integer>>(){

            public void flatMap(String s, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
                collector.collect((Object)new Tuple2((Object)s.split(" ").length, (Object)1));
            }
        });
        List results = lengthsCount.groupBy(new int[]{0}).sum(1).collect();
        WordFrequency.printResults(count, results);
    }

    private static void printResults(long entries, List<Tuple2<Integer, Integer>> results) {
        System.out.printf("TOTAL PHRASES ANALYZED: %d. HISTOGRAM:\n", entries);
        results.forEach(t -> {
            Integer wordNumber = (Integer)t.f0;
            Integer count = (Integer)t.f1;
            System.out.printf("%-3d word phrases:", wordNumber);
            IntStream.range(1, count).boxed().forEach(c -> System.out.print("*"));
            System.out.printf("(%d)\n", count);
        });
    }
}

