/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.examples;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.Twister2Worker;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.tset.env.BatchEnvironment;
import edu.iu.dsc.tws.tset.env.TSetEnvironment;
import java.io.Serializable;
import org.apache.beam.runners.twister2.Twister2LegacyRunner;
import org.apache.beam.runners.twister2.Twister2PipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class WordCount
implements Serializable,
Twister2Worker {
    public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

    public void execute(WorkerEnvironment workerEnv) {
        BatchEnvironment env = TSetEnvironment.initBatch((WorkerEnvironment)workerEnv);
        Config config = env.getConfig();
        String input = config.getStringValue("input");
        String output = config.getStringValue("output");
        System.out.println("Rank " + env.getWorkerID());
        Twister2PipelineOptions options = (Twister2PipelineOptions)PipelineOptionsFactory.as(Twister2PipelineOptions.class);
        options.setTSetEnvironment((TSetEnvironment)env);
        ((Twister2PipelineOptions)options.as(Twister2PipelineOptions.class)).setRunner(Twister2LegacyRunner.class);
        WordCount.runWordCount(options, input, output);
    }

    static void runWordCount(Twister2PipelineOptions options, String input, String output) {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        ((PCollection)((PCollection)((PCollection)p.apply("ReadLines", (PTransform)TextIO.read().from(input))).apply((PTransform)new CountWords())).apply((PTransform)MapElements.via((SimpleFunction)new FormatAsTextFn()))).apply("WriteCounts", (PTransform)TextIO.write().to(output));
        p.run().waitUntilFinish();
    }

    public static interface WordCountOptions
    extends PipelineOptions {
        @Description(value="Path of the file to read from")
        @Default.String(value="gs://apache-beam-samples/shakespeare/kinglear.txt")
        public String getInputFile();

        public void setInputFile(String var1);

        @Description(value="Path of the file to write to")
        @Validation.Required
        public String getOutput();

        public void setOutput(String var1);
    }

    public static class CountWords
    extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
            PCollection words = (PCollection)lines.apply((PTransform)ParDo.of((DoFn)new ExtractWordsFn()));
            PCollection wordCounts = (PCollection)words.apply(Count.perElement());
            return wordCounts;
        }
    }

    public static class FormatAsTextFn
    extends SimpleFunction<KV<String, Long>, String> {
        public String apply(KV<String, Long> input) {
            return (String)input.getKey() + ": " + input.getValue();
        }
    }

    static class ExtractWordsFn
    extends DoFn<String, String> {
        private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, (String)"emptyLines");
        private final Distribution lineLenDist = Metrics.distribution(ExtractWordsFn.class, (String)"lineLenDistro");

        ExtractWordsFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, DoFn.OutputReceiver<String> receiver) {
            String[] words;
            this.lineLenDist.update((long)element.length());
            if (element.trim().isEmpty()) {
                this.emptyLines.inc();
            }
            for (String word : words = element.split(WordCount.TOKENIZER_PATTERN, -1)) {
                if (word.isEmpty()) continue;
                receiver.output((Object)word);
            }
        }
    }
}

