/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.javaexample;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.MethodToInvoke;
import pl.touk.nussknacker.engine.api.ParamName;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomStreamTransformation;
import pl.touk.nussknacker.engine.flink.javaapi.process.JavaFlinkCustomStreamTransformation;
import pl.touk.nussknacker.engine.javaexample.Transaction;

public class TransactionAmountAggregator
extends CustomStreamTransformer {
    @MethodToInvoke
    public FlinkCustomStreamTransformation execute(@ParamName(value="clientId") LazyParameter<String> lazyParameter) {
        return JavaFlinkCustomStreamTransformation.apply((dataStream, flinkCustomNodeContext) -> dataStream.map(flinkCustomNodeContext.nodeServices().lazyMapFunction(lazyParameter)).keyBy((KeySelector)new KeySelector<ValueWithContext<String>, String>(){

            public String getKey(ValueWithContext<String> valueWithContext) throws Exception {
                return (String)valueWithContext.value();
            }
        }).map(this.amountAggregateFunction()));
    }

    private RichMapFunction<ValueWithContext<String>, ValueWithContext<Object>> amountAggregateFunction() {
        return new RichMapFunction<ValueWithContext<String>, ValueWithContext<Object>>(){
            ValueState<AggregatedAmount> state = null;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                TypeInformation typeInformation = TypeInformation.of(AggregatedAmount.class);
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("state", typeInformation.createSerializer(this.getRuntimeContext().getExecutionConfig()));
                this.state = this.getRuntimeContext().getState(valueStateDescriptor);
            }

            public ValueWithContext<Object> map(ValueWithContext<String> valueWithContext) throws Exception {
                Transaction transaction = (Transaction)valueWithContext.context().apply("input");
                int n = transaction.amount + (this.state.value() == null ? 0 : ((AggregatedAmount)this.state.value()).amount);
                this.state.update((Object)new AggregatedAmount(transaction.clientId, n));
                return new ValueWithContext(this.state.value(), valueWithContext.context());
            }
        };
    }

    public static class AggregatedAmount {
        public String clientId;
        public int amount;

        public AggregatedAmount(String string, int n) {
            this.clientId = string;
            this.amount = n;
        }
    }
}

