/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.Closeables;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.crunch.DoFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hive.conf.HiveConf;
import org.kitesdk.compat.DynMethods;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.View;
import org.kitesdk.data.crunch.CrunchDatasets;
import org.kitesdk.tools.TaskUtil;

public class TransformTask<S, T>
extends Configured {
    private static DynMethods.StaticMethod getEnumByName = new DynMethods.Builder("valueOf").impl("org.apache.hadoop.mapred.Task$Counter", new Class[]{String.class}).impl("org.apache.hadoop.mapreduce.TaskCounter", new Class[]{String.class}).defaultNoop().buildStatic();
    private static final Enum<?> MAP_INPUT_RECORDS = (Enum)getEnumByName.invoke(new Object[]{"MAP_INPUT_RECORDS"});
    private static final String LOCAL_FS_SCHEME = "file";
    private final View<S> from;
    private final View<T> to;
    private final DoFn<S, T> transform;
    private boolean compact = true;
    private int numWriters = -1;
    private long count = 0L;

    public TransformTask(View<S> from, View<T> to, DoFn<S, T> transform) {
        this.from = from;
        this.to = to;
        this.transform = transform;
    }

    public long getCount() {
        return this.count;
    }

    public TransformTask noCompaction() {
        this.compact = false;
        this.numWriters = 0;
        return this;
    }

    public TransformTask setNumWriters(int numWriters) {
        Preconditions.checkArgument((numWriters >= 0 ? 1 : 0) != 0, (Object)("Invalid number of reducers: " + numWriters));
        if (numWriters == 0) {
            this.noCompaction();
        } else {
            this.numWriters = numWriters;
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PipelineResult run() throws IOException {
        boolean runInParallel = true;
        if (TransformTask.isLocal(this.from.getDataset()) || TransformTask.isLocal(this.to.getDataset())) {
            runInParallel = false;
        }
        AvroType<T> toPType = TransformTask.ptype(this.to);
        CheckEntityClass validate = new CheckEntityClass(this.to.getType());
        if (runInParallel) {
            TaskUtil.configure(this.getConf()).addJarPathForClass(HiveConf.class).addJarForClass(AvroKeyInputFormat.class);
            MRPipeline pipeline = new MRPipeline(((Object)((Object)this)).getClass(), this.getConf());
            PCollection collection = pipeline.read((Source)CrunchDatasets.asSource(this.from)).parallelDo(this.transform, toPType).parallelDo(validate, toPType);
            if (this.compact) {
                collection = CrunchDatasets.partition((PCollection)collection, this.to, (int)this.numWriters);
            }
            pipeline.write(collection, CrunchDatasets.asTarget(this.to), Target.WriteMode.APPEND);
            PipelineResult result = pipeline.done();
            PipelineResult.StageResult sr = (PipelineResult.StageResult)Iterables.getFirst((Iterable)result.getStageResults(), null);
            if (sr != null && MAP_INPUT_RECORDS != null) {
                this.count = sr.getCounterValue(MAP_INPUT_RECORDS);
            }
            return result;
        }
        Pipeline pipeline = MemPipeline.getInstance();
        PCollection collection = pipeline.read((Source)CrunchDatasets.asSource(this.from)).parallelDo(this.transform, toPType).parallelDo(validate, toPType);
        boolean threw = true;
        DatasetWriter writer = null;
        try {
            writer = this.to.newWriter();
            for (Object entity : collection.materialize()) {
                writer.write(entity);
                ++this.count;
            }
            threw = false;
        }
        finally {
            Closeables.close((Closeable)writer, (boolean)threw);
        }
        return pipeline.done();
    }

    private static boolean isLocal(Dataset<?> dataset) {
        URI location = dataset.getDescriptor().getLocation();
        return location != null && LOCAL_FS_SCHEME.equals(location.getScheme());
    }

    private static <T> AvroType<T> ptype(View<T> view) {
        Class recordClass = view.getType();
        if (GenericRecord.class.isAssignableFrom(recordClass)) {
            return Avros.generics((Schema)view.getDataset().getDescriptor().getSchema());
        }
        return Avros.records((Class)recordClass);
    }

    @SuppressWarnings(value={"SE_NO_SERIALVERSIONID"}, justification="Purposely not supported across versions")
    public static class CheckEntityClass<E>
    extends MapFn<E, E> {
        private final Class<?> entityClass;

        public CheckEntityClass(Class<?> entityClass) {
            this.entityClass = entityClass;
        }

        public E map(E input) {
            if (input != null && this.entityClass.isAssignableFrom(input.getClass())) {
                return input;
            }
            throw new DatasetException("Object does not match expected type " + this.entityClass + ": " + String.valueOf(input));
        }
    }
}

