/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.javaexample;

import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.MethodToInvoke;
import pl.touk.nussknacker.engine.api.ParamName;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.api.util.MultiMap;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomStreamTransformation;
import pl.touk.nussknacker.engine.flink.api.state.TimestampedEvictableStateFunction;
import pl.touk.nussknacker.engine.flink.javaapi.process.JavaFlinkCustomStreamTransformation;
import scala.collection.JavaConversions;
import scala.concurrent.duration.Duration;

public class EventsCounter
extends CustomStreamTransformer {
    @MethodToInvoke(returnType=EventCount.class)
    public FlinkCustomStreamTransformation execute(@ParamName(value="key") LazyParameter<String> lazyParameter, @ParamName(value="length") String string2) {
        return JavaFlinkCustomStreamTransformation.apply((dataStream, flinkCustomNodeContext) -> {
            long l = Duration.apply(string2).toMillis();
            return dataStream.map(flinkCustomNodeContext.nodeServices().lazyMapFunction(lazyParameter)).keyBy((KeySelector)new KeySelector<ValueWithContext<String>, String>(){

                public String getKey(ValueWithContext<String> valueWithContext) throws Exception {
                    return valueWithContext.value();
                }
            }).process((KeyedProcessFunction)new CounterFunction(l));
        });
    }

    public static class CounterFunction
    extends TimestampedEvictableStateFunction<ValueWithContext<String>, ValueWithContext<Object>, Integer> {
        long lengthInMillis;

        public CounterFunction(long l) {
            this.lengthInMillis = l;
        }

        @Override
        public ValueStateDescriptor<MultiMap<Object, Integer>> stateDescriptor() {
            return new ValueStateDescriptor("state", MultiMap.class);
        }

        public void processElement(ValueWithContext<String> valueWithContext, KeyedProcessFunction.Context context, Collector<ValueWithContext<Object>> collector) throws Exception {
            long l = context.timestamp();
            this.moveEvictionTime(this.lengthInMillis, context);
            MultiMap<Object, Integer> multiMap = this.stateValue().add((Object)l, 1);
            this.state().update(multiMap);
            int n = JavaConversions.asJavaCollection(multiMap.map().values()).stream().map(list2 -> JavaConversions.asJavaCollection(list2).stream().mapToInt(Integer::intValue).sum()).mapToInt(Integer::intValue).sum();
            collector.collect(new ValueWithContext<EventCount>(new EventCount(n), valueWithContext.context()));
        }
    }

    public static class EventCount {
        long count;

        public EventCount(long l) {
            this.count = l;
        }

        public long count() {
            return this.count;
        }
    }
}

