/*
 * Decompiled with CFR 0.152.
 */
package de.softwareforge.kafka;

import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import de.softwareforge.kafka.LoaderOptions;
import de.softwareforge.kafka.LongPartitioner;
import de.softwareforge.kafka.TpchMain;
import io.airlift.command.Command;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.log.LoggingConfiguration;
import io.airlift.log.LoggingMBean;
import io.airlift.tpch.TpchEntity;
import io.airlift.tpch.TpchTable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

@Command(name="load", description="Load TPCH data into Kafka")
public class LoadCommand
extends TpchMain.TpchCommand {
    private static final Logger LOG = Logger.get(LoadCommand.class);
    private final ObjectMapperProvider objectMapperProvider = new ObjectMapperProvider();
    @Inject
    public LoaderOptions loaderOptions = new LoaderOptions();

    @Override
    public void execute() throws Exception {
        ImmutableList tables;
        Logging logging = Logging.initialize();
        logging.configure(new LoggingConfiguration());
        new LoggingMBean().setLevel("kafka", "ERROR");
        String tableNames = this.loaderOptions.tables;
        ImmutableMap allTables = ImmutableMap.copyOf((Map)Maps.uniqueIndex((Iterable)TpchTable.getTables(), (Function)new Function<TpchTable<?>, String>(){

            public String apply(@Nonnull TpchTable<?> input) {
                return input.getTableName();
            }
        }));
        if (tableNames == null) {
            tables = ImmutableList.copyOf(allTables.keySet());
        } else {
            ImmutableList.Builder builder = ImmutableList.builder();
            for (String tableName : Splitter.on((String)",").omitEmptyStrings().trimResults().split((CharSequence)tableNames)) {
                Preconditions.checkState((boolean)allTables.keySet().contains(tableName), (String)"Table %s is unknown", (Object[])new Object[]{tableName});
                builder.add((Object)tableName);
            }
            tables = builder.build();
        }
        LOG.info("Processing tables: %s", new Object[]{tables});
        Properties props = new Properties();
        props.put("zk.connect", this.loaderOptions.zookeeper);
        props.put("serializer.class", StringEncoder.class.getName());
        props.put("partitioner.class", LongPartitioner.class.getName());
        props.put("serializer.encoding", "UTF8");
        props.put("request.required.acks", "1");
        ProducerConfig producerConfig = new ProducerConfig(props);
        ObjectMapper mapper = this.objectMapperProvider.get();
        mapper.enable(new MapperFeature[]{MapperFeature.AUTO_DETECT_GETTERS});
        Producer producer = new Producer(producerConfig);
        ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool());
        ImmutableList.Builder futureBuilder = ImmutableList.builder();
        for (String table : tables) {
            ListenableFuture future = executor.submit((Callable)new Callable<Long>((Map)allTables, table, mapper, producer){
                final /* synthetic */ Map val$allTables;
                final /* synthetic */ String val$table;
                final /* synthetic */ ObjectMapper val$mapper;
                final /* synthetic */ Producer val$producer;
                {
                    this.val$allTables = map;
                    this.val$table = string;
                    this.val$mapper = objectMapper;
                    this.val$producer = producer;
                }

                @Override
                public Long call() throws Exception {
                    TpchTable tpchTable = (TpchTable)this.val$allTables.get(this.val$table);
                    LOG.info("Loading table '%s' into topic '%s%s'...", new Object[]{this.val$table, LoadCommand.this.loaderOptions.prefix, this.val$table});
                    long count = 0L;
                    for (List partition : Iterables.partition((Iterable)tpchTable.createGenerator(LoadCommand.this.loaderOptions.tpchType.getScaleFactor(), 1, 1), (int)100)) {
                        ImmutableList.Builder builder = ImmutableList.builder();
                        for (TpchEntity o : partition) {
                            builder.add((Object)this.val$mapper.writeValueAsString((Object)o));
                            ++count;
                        }
                        ProducerData message = new ProducerData(LoadCommand.this.loaderOptions.prefix + this.val$table, (List)builder.build());
                        this.val$producer.send(message);
                    }
                    LOG.info("Generated %d rows for table '%s'.", new Object[]{count, this.val$table});
                    return count;
                }
            });
            futureBuilder.add((Object)future);
        }
        Futures.allAsList((Iterable)futureBuilder.build()).get();
        executor.shutdown();
        executor.awaitTermination(1L, TimeUnit.DAYS);
        producer.close();
    }
}

