/*
 * Decompiled with CFR 0.152.
 */
package pl.touk.nussknacker.engine.javaexample;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.ProcessListener;
import pl.touk.nussknacker.engine.api.Service;
import pl.touk.nussknacker.engine.api.exception.ExceptionHandlerFactory;
import pl.touk.nussknacker.engine.api.process.SingleNodeConfig$;
import pl.touk.nussknacker.engine.api.process.SinkFactory;
import pl.touk.nussknacker.engine.api.process.SourceFactory;
import pl.touk.nussknacker.engine.api.process.WithCategories;
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender;
import pl.touk.nussknacker.engine.api.test.TestParsingUtils;
import pl.touk.nussknacker.engine.example.LoggingExceptionHandlerFactory;
import pl.touk.nussknacker.engine.javaapi.process.ExpressionConfig;
import pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator;
import pl.touk.nussknacker.engine.javaexample.ClientService;
import pl.touk.nussknacker.engine.javaexample.EventsCounter;
import pl.touk.nussknacker.engine.javaexample.Transaction;
import pl.touk.nussknacker.engine.javaexample.TransactionAmountAggregator;
import pl.touk.nussknacker.engine.javaexample.UtilProcessHelper;
import pl.touk.nussknacker.engine.kafka.KafkaConfig;
import pl.touk.nussknacker.engine.kafka.KafkaSinkFactory;
import pl.touk.nussknacker.engine.kafka.KafkaSourceFactory;
import scala.Option;
import scala.collection.JavaConversions;

public class ExampleProcessConfigCreator
implements ProcessConfigCreator {
    private <T> WithCategories<T> all(T t) {
        ArrayList<String> arrayList = new ArrayList<String>();
        arrayList.add("Recommendations");
        arrayList.add("FraudDetection");
        return new WithCategories(t, JavaConversions.collectionAsScalaIterable(arrayList).toList(), SingleNodeConfig$.MODULE$.zero());
    }

    public Map<String, WithCategories<Service>> services(Config config) {
        HashMap<String, WithCategories<Service>> hashMap = new HashMap<String, WithCategories<Service>>();
        hashMap.put("clientService", this.all(new ClientService()));
        return hashMap;
    }

    public Map<String, WithCategories<SourceFactory<?>>> sourceFactories(Config config) {
        KafkaConfig kafkaConfig = this.getKafkaConfig(config);
        KafkaSourceFactory<Transaction> kafkaSourceFactory = this.getTransactionKafkaSourceFactory(kafkaConfig);
        HashMap hashMap = new HashMap();
        hashMap.put("kafka-transaction", this.all(kafkaSourceFactory));
        return hashMap;
    }

    private KafkaSourceFactory<Transaction> getTransactionKafkaSourceFactory(KafkaConfig kafkaConfig) {
        BoundedOutOfOrdernessTimestampExtractor<Transaction> boundedOutOfOrdernessTimestampExtractor = new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.minutes((long)10L)){

            public long extractTimestamp(Transaction transaction) {
                return transaction.eventDate;
            }
        };
        DeserializationSchema<Transaction> deserializationSchema = new DeserializationSchema<Transaction>(){

            public Transaction deserialize(byte[] byArray) throws IOException {
                return (Transaction)new ObjectMapper().readValue(byArray, Transaction.class);
            }

            public boolean isEndOfStream(Transaction transaction) {
                return false;
            }

            public TypeInformation<Transaction> getProducedType() {
                return TypeInformation.of(Transaction.class);
            }
        };
        return new KafkaSourceFactory(kafkaConfig, (DeserializationSchema)deserializationSchema, Option.apply((Object)boundedOutOfOrdernessTimestampExtractor), TestParsingUtils.newLineSplit(), TypeInformation.of(Transaction.class));
    }

    public Map<String, WithCategories<SinkFactory>> sinkFactories(Config config) {
        KafkaConfig kafkaConfig = this.getKafkaConfig(config);
        KeyedSerializationSchema<Object> keyedSerializationSchema = new KeyedSerializationSchema<Object>(){

            public byte[] serializeKey(Object object) {
                return UUID.randomUUID().toString().getBytes();
            }

            public byte[] serializeValue(Object object) {
                if (object instanceof String) {
                    return ((String)object).getBytes();
                }
                throw new RuntimeException("Sorry, only strings");
            }

            public String getTargetTopic(Object object) {
                return null;
            }
        };
        KafkaSinkFactory kafkaSinkFactory = new KafkaSinkFactory(kafkaConfig, (KeyedSerializationSchema)keyedSerializationSchema);
        HashMap<String, WithCategories<SinkFactory>> hashMap = new HashMap<String, WithCategories<SinkFactory>>();
        hashMap.put("kafka-stringSink", this.all(kafkaSinkFactory));
        return hashMap;
    }

    public Map<String, WithCategories<CustomStreamTransformer>> customStreamTransformers(Config config) {
        HashMap<String, WithCategories<CustomStreamTransformer>> hashMap = new HashMap<String, WithCategories<CustomStreamTransformer>>();
        hashMap.put("eventsCounter", this.all(new EventsCounter()));
        hashMap.put("transactionAmountAggregator", this.all(new TransactionAmountAggregator()));
        return hashMap;
    }

    public Map<String, WithCategories<ProcessSignalSender>> signals(Config config) {
        return Collections.emptyMap();
    }

    public Collection<ProcessListener> listeners(Config config) {
        return Collections.emptyList();
    }

    public ExceptionHandlerFactory exceptionHandlerFactory(Config config) {
        return new LoggingExceptionHandlerFactory(config);
    }

    public ExpressionConfig expressionConfig(Config config) {
        return new ExpressionConfig(Collections.singletonMap("UTIL", this.all(new UtilProcessHelper())), Collections.emptyList());
    }

    public Map<String, String> buildInfo() {
        return Collections.emptyMap();
    }

    private KafkaConfig getKafkaConfig(Config config) {
        return new KafkaConfig(config.getString("kafka.zkAddress"), config.getString("kafka.kafkaAddress"), Option.empty(), Option.empty());
    }
}

