/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.translators.batch;

import edu.iu.dsc.tws.tset.sets.batch.BatchTSetImpl;
import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet;
import edu.iu.dsc.tws.tset.sets.batch.KeyedTSet;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunction;
import org.apache.beam.runners.twister2.translators.functions.GroupByWindowFunction;
import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;

public class GroupByKeyTranslatorBatch<K, V>
implements BatchTransformTranslator<GroupByKey<K, V>> {
    @Override
    public void translateNode(GroupByKey<K, V> transform, Twister2BatchTranslationContext context) {
        PCollection input = (PCollection)context.getInput(transform);
        BatchTSetImpl inputTTSet = context.getInputDataSet((PValue)input);
        KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
        Coder inputKeyCoder = ((KvCoder)input.getCoder()).getKeyCoder();
        WindowingStrategy windowingStrategy = input.getWindowingStrategy();
        WindowFn windowFn = windowingStrategy.getWindowFn();
        WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)coder.getValueCoder(), (Coder)windowFn.windowCoder());
        KeyedTSet keyedTSet = inputTTSet.mapToTuple(new MapToTupleFunction(inputKeyCoder, wvCoder));
        ComputeTSet groupedbyKeyTset = keyedTSet.keyedGather().map(new ByteToWindowFunction(inputKeyCoder, wvCoder));
        ComputeTSet outputTset = groupedbyKeyTset.direct().flatmap(new GroupByWindowFunction(windowingStrategy, SystemReduceFn.buffering((Coder)coder.getValueCoder())));
        PCollection output = (PCollection)context.getOutput(transform);
        context.setOutputDataSet(output, outputTset);
    }
}

