/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.crunch.DoFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.token.Token;
import org.apache.thrift.TException;
import org.kitesdk.compat.DynMethods;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.View;
import org.kitesdk.data.crunch.CrunchDatasets;
import org.kitesdk.tools.TaskUtil;

public class TransformTask<S, T>
extends Configured {
    private static DynMethods.StaticMethod getEnumByName = new DynMethods.Builder("valueOf").impl("org.apache.hadoop.mapred.Task$Counter", new Class[]{String.class}).impl("org.apache.hadoop.mapreduce.TaskCounter", new Class[]{String.class}).defaultNoop().buildStatic();
    private static final Enum<?> MAP_INPUT_RECORDS = (Enum)getEnumByName.invoke(new Object[]{"MAP_INPUT_RECORDS"});
    private static final String LOCAL_FS_SCHEME = "file";
    private final View<S> from;
    private final View<T> to;
    private final DoFn<S, T> transform;
    private boolean compact = true;
    private int numWriters = -1;
    private int numPartitionWriters = -1;
    private Target.WriteMode mode = Target.WriteMode.APPEND;
    private long count = 0L;
    private static final Text HIVE_MS_TOKEN_ALIAS = new Text("HIVE_METASTORE_TOKEN");

    public TransformTask(View<S> from, View<T> to, DoFn<S, T> transform) {
        this.from = from;
        this.to = to;
        this.transform = transform;
    }

    public long getCount() {
        return this.count;
    }

    public TransformTask noCompaction() {
        this.compact = false;
        this.numWriters = 0;
        this.numPartitionWriters = 0;
        return this;
    }

    public TransformTask setNumWriters(int numWriters) {
        Preconditions.checkArgument((numWriters >= 0 ? 1 : 0) != 0, (Object)("Invalid number of reducers: " + numWriters));
        if (numWriters == 0) {
            this.noCompaction();
        } else {
            this.numWriters = numWriters;
        }
        return this;
    }

    public TransformTask setFilesPerPartition(int filesPerPartition) {
        Preconditions.checkArgument((filesPerPartition > 0 ? 1 : 0) != 0, (Object)("Invalid number of files per partition: " + filesPerPartition));
        this.numPartitionWriters = filesPerPartition;
        return this;
    }

    public TransformTask setWriteMode(Target.WriteMode mode) {
        Preconditions.checkArgument((mode != Target.WriteMode.CHECKPOINT ? 1 : 0) != 0, (Object)"Checkpoint is not an allowed write mode");
        this.mode = mode;
        return this;
    }

    public PipelineResult run() throws IOException {
        if (TransformTask.isLocal(this.from.getDataset()) || TransformTask.isLocal(this.to.getDataset())) {
            Configuration conf = new Configuration(this.getConf());
            conf.set("mapreduce.framework.name", "local");
            conf.set("fs.defaultFS", "file:/");
            conf.set("fs.default.name", "file:/");
            this.setConf(conf);
        }
        if (TransformTask.isHive(this.from) || TransformTask.isHive(this.to)) {
            this.setConf(TransformTask.addHiveDelegationToken(this.getConf()));
        }
        AvroType<T> toPType = TransformTask.ptype(this.to);
        CheckEntityClass validate = new CheckEntityClass(this.to.getType());
        TaskUtil.configure(this.getConf()).addJarPathForClass(HiveConf.class);
        MRPipeline pipeline = new MRPipeline(((Object)((Object)this)).getClass(), this.getConf());
        PCollection collection = pipeline.read((Source)CrunchDatasets.asSource(this.from)).parallelDo(this.transform, toPType).parallelDo(validate, toPType);
        if (this.compact) {
            collection = CrunchDatasets.partition((PCollection)collection, this.to, (int)this.numWriters, (int)this.numPartitionWriters);
        }
        pipeline.write(collection, CrunchDatasets.asTarget(this.to), this.mode);
        PipelineResult result = pipeline.done();
        PipelineResult.StageResult sr = (PipelineResult.StageResult)Iterables.getFirst((Iterable)result.getStageResults(), null);
        if (sr != null && MAP_INPUT_RECORDS != null) {
            this.count = sr.getCounterValue(MAP_INPUT_RECORDS);
        }
        return result;
    }

    private static boolean isLocal(Dataset<?> dataset) {
        URI location = dataset.getDescriptor().getLocation();
        return location != null && LOCAL_FS_SCHEME.equals(location.getScheme());
    }

    private static <T> AvroType<T> ptype(View<T> view) {
        Class recordClass = view.getType();
        if (GenericRecord.class.isAssignableFrom(recordClass)) {
            return Avros.generics((Schema)view.getDataset().getDescriptor().getSchema());
        }
        return Avros.records((Class)recordClass);
    }

    private static boolean isHive(View<?> view) {
        return "hive".equals(URI.create(view.getUri().getSchemeSpecificPart()).getScheme());
    }

    private static Configuration addHiveDelegationToken(Configuration conf) {
        JobConf jobConf = new JobConf(conf);
        try {
            if (conf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) {
                HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf());
                String hiveTokenStr = metaStoreClient.getDelegationToken("yarn");
                Token hiveToken = new Token();
                hiveToken.decodeFromUrlString(hiveTokenStr);
                jobConf.getCredentials().addToken(HIVE_MS_TOKEN_ALIAS, hiveToken);
            }
            return jobConf;
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to obtain Hive delegation token", e);
        }
        catch (TException e) {
            throw new RuntimeException("Unable to obtain Hive delegation token", e);
        }
    }

    @SuppressWarnings(value={"SE_NO_SERIALVERSIONID"}, justification="Purposely not supported across versions")
    public static class CheckEntityClass<E>
    extends MapFn<E, E> {
        private final Class<?> entityClass;

        public CheckEntityClass(Class<?> entityClass) {
            this.entityClass = entityClass;
        }

        public E map(E input) {
            if (input != null && this.entityClass.isAssignableFrom(input.getClass())) {
                return input;
            }
            throw new DatasetException("Object does not match expected type " + this.entityClass + ": " + String.valueOf(input));
        }
    }
}

