/*
 * Decompiled with CFR 0.152.
 */
package ml.shifu.guagua.yarn;

import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import ml.shifu.guagua.GuaguaRuntimeException;
import ml.shifu.guagua.GuaguaService;
import ml.shifu.guagua.hadoop.io.GuaguaInputSplit;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.GuaguaFileSplit;
import ml.shifu.guagua.master.GuaguaMasterService;
import ml.shifu.guagua.util.Progressable;
import ml.shifu.guagua.worker.GuaguaWorkerService;
import ml.shifu.guagua.yarn.GuaguaIterationStatus;
import ml.shifu.guagua.yarn.util.GsonUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.common.IOUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ClassResolvers;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GuaguaYarnTask<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable> {
    private static final Logger LOG = LoggerFactory.getLogger(GuaguaYarnTask.class);
    private int partition;
    private ApplicationAttemptId appAttemptId;
    private ContainerId containerId;
    private ApplicationId appId;
    private Configuration yarnConf;
    private boolean isMaster;
    private GuaguaService guaguaService;
    private GuaguaInputSplit inputSplit;
    private int rpcPort = 12345;
    private String rpcHostName;
    private Channel rpcClientChannel;
    private ClientBootstrap rpcClient;

    public GuaguaYarnTask(ApplicationAttemptId appAttemptId, ContainerId containerId, int partition, String rpcHostName, String rpcPort, Configuration conf) {
        this.appAttemptId = appAttemptId;
        this.containerId = containerId;
        this.partition = partition;
        this.rpcHostName = rpcHostName;
        this.rpcPort = Integer.parseInt(rpcPort);
        LOG.info("current partition:{}", (Object)this.getPartition());
        this.appId = this.getAppAttemptId().getApplicationId();
        this.yarnConf = conf;
        this.inputSplit = GsonUtils.fromJson(this.getYarnConf().get("guagua.yarn.input.split." + partition), GuaguaInputSplit.class);
        LOG.info("current input split:{}", (Object)this.getInputSplit());
    }

    protected void setup() {
        this.setMaster(this.getInputSplit().isMaster());
        if (this.isMaster()) {
            this.setGuaguaService((GuaguaService)new GuaguaMasterService());
        } else {
            this.setGuaguaService((GuaguaService)new GuaguaWorkerService());
            LinkedList<GuaguaFileSplit> splits = new LinkedList<GuaguaFileSplit>();
            for (FileSplit fileSplit : this.getInputSplit().getFileSplits()) {
                splits.add(new GuaguaFileSplit(fileSplit.getPath().toString(), fileSplit.getStart(), fileSplit.getLength()));
            }
            this.getGuaguaService().setSplits(splits);
        }
        Properties props = this.replaceConfToProps();
        this.getGuaguaService().setAppId(this.getAppId().toString());
        this.getGuaguaService().setContainerId(this.getPartition() + "");
        this.getGuaguaService().init(props);
        this.getGuaguaService().start();
        this.initRPCClient();
    }

    private void initRPCClient() {
        this.rpcClient = new ClientBootstrap((ChannelFactory)new NioClientSocketChannelFactory((Executor)Executors.newSingleThreadExecutor(), (Executor)Executors.newSingleThreadExecutor()));
        this.rpcClient.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline((ChannelHandler[])new ChannelHandler[]{new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled((ClassLoader)this.getClass().getClassLoader())), new ClientHandler()});
            }
        });
        ChannelFuture future = this.rpcClient.connect((SocketAddress)new InetSocketAddress(this.rpcHostName, this.rpcPort));
        LOG.info("Connect to {}:{}", (Object)this.rpcHostName, (Object)this.rpcPort);
        this.rpcClientChannel = future.awaitUninterruptibly().getChannel();
    }

    private Properties replaceConfToProps() {
        Properties properties = new Properties();
        for (Map.Entry entry : this.getYarnConf()) {
            properties.put(entry.getKey(), entry.getValue());
            if (!LOG.isInfoEnabled() || !((String)entry.getKey()).toString().startsWith("guagua")) continue;
            LOG.debug("{}:{}", entry.getKey(), entry.getValue());
        }
        return properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T getSplitDetails(Path file, long offset) throws IOException {
        FileSystem fs = file.getFileSystem(this.getYarnConf());
        FSDataInputStream inFile = null;
        Object split = null;
        try {
            Class cls;
            inFile = fs.open(file);
            inFile.seek(offset);
            String className = Text.readString((DataInput)inFile);
            try {
                cls = this.getYarnConf().getClassByName(className);
            }
            catch (ClassNotFoundException ce) {
                IOException wrap = new IOException(String.format("Split class %s not found", className));
                wrap.initCause(ce);
                throw wrap;
            }
            SerializationFactory factory = new SerializationFactory(this.getYarnConf());
            Deserializer deserializer = factory.getDeserializer(cls);
            deserializer.open((InputStream)inFile);
            split = deserializer.deserialize(null);
        }
        finally {
            IOUtils.closeStream((Closeable)inFile);
        }
        return (T)split;
    }

    public void run() {
        try {
            this.setup();
            this.getGuaguaService().run(new Progressable(){

                public void progress(int currentIteration, int totalIteration, String status, boolean isLastUpdate, boolean isKill) {
                    if (isLastUpdate) {
                        LOG.info("Application progress: {}%.", (Object)(currentIteration * 100 / totalIteration));
                        GuaguaIterationStatus gi = new GuaguaIterationStatus(GuaguaYarnTask.this.partition, currentIteration, totalIteration);
                        gi.setKillContainer(isKill);
                        LOG.info("Send GuaguaIterationStatus: {}.", (Object)gi);
                        ChannelFuture channelFuture = GuaguaYarnTask.this.rpcClientChannel.write((Object)GsonUtils.toJson(gi));
                        try {
                            channelFuture.await(10L, TimeUnit.SECONDS);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            });
        }
        catch (Exception e) {
            LOG.error("Error in guagua main run method.", (Throwable)e);
            throw new GuaguaRuntimeException((Throwable)e);
        }
        finally {
            this.cleanup();
        }
    }

    protected void cleanup() {
        if (this.rpcClient != null) {
            this.rpcClient.shutdown();
            this.rpcClient.releaseExternalResources();
        }
        if (this.rpcClientChannel != null) {
            this.rpcClientChannel.close();
        }
        this.getGuaguaService().stop();
    }

    public GuaguaService getGuaguaService() {
        return this.guaguaService;
    }

    public void setGuaguaService(GuaguaService guaguaService) {
        this.guaguaService = guaguaService;
    }

    public int getPartition() {
        return this.partition;
    }

    public void setPartition(int partition) {
        this.partition = partition;
    }

    public ApplicationAttemptId getAppAttemptId() {
        return this.appAttemptId;
    }

    public void setAppAttemptId(ApplicationAttemptId appAttemptId) {
        this.appAttemptId = appAttemptId;
    }

    public ContainerId getContainerId() {
        return this.containerId;
    }

    public void setContainerId(ContainerId containerId) {
        this.containerId = containerId;
    }

    public boolean isMaster() {
        return this.isMaster;
    }

    public void setMaster(boolean isMaster) {
        this.isMaster = isMaster;
    }

    public Configuration getYarnConf() {
        return this.yarnConf;
    }

    public void setYarnConf(YarnConfiguration yarnConf) {
        this.yarnConf = yarnConf;
    }

    public ApplicationId getAppId() {
        return this.appId;
    }

    public void setAppId(ApplicationId appId) {
        this.appId = appId;
    }

    public GuaguaInputSplit getInputSplit() {
        return this.inputSplit;
    }

    public void setInputSplit(GuaguaInputSplit inputSplit) {
        this.inputSplit = inputSplit;
    }

    public static void main(String[] args) {
        LOG.info("args:{}", (Object)Arrays.toString(args));
        if (args.length != 7) {
            throw new IllegalStateException(String.format("GuaguaYarnTask could not construct a TaskAttemptID for the Guagua job from args: %s", Arrays.toString(args)));
        }
        String containerIdString = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
        if (containerIdString == null) {
            throw new IllegalArgumentException("ContainerId not found in env vars.");
        }
        ContainerId containerId = ConverterUtils.toContainerId((String)containerIdString);
        ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
        try {
            YarnConfiguration conf = new YarnConfiguration();
            String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
            conf.set("mapreduce.job.user.name", jobUserName);
            UserGroupInformation.setConfiguration((Configuration)conf);
            Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
            LOG.info("Executing with tokens:");
            for (Token token : credentials.getAllTokens()) {
                LOG.info(token.toString());
            }
            UserGroupInformation appTaskUGI = UserGroupInformation.createRemoteUser((String)jobUserName);
            appTaskUGI.addCredentials(credentials);
            final GuaguaYarnTask guaguaYarnTask = new GuaguaYarnTask(appAttemptId, containerId, Integer.parseInt(args[args.length - 3]), args[args.length - 2], args[args.length - 1], (Configuration)conf);
            appTaskUGI.doAs((PrivilegedAction)new PrivilegedAction<Void>(){

                @Override
                public Void run() {
                    guaguaYarnTask.run();
                    return null;
                }
            });
        }
        catch (Throwable t) {
            LOG.error("GuaguaYarnTask threw a top-level exception, failing task", t);
            System.exit(2);
        }
        System.exit(0);
    }

    static {
        Configuration.addDefaultResource((String)"guagua-conf.xml");
    }

    public static class ClientHandler
    extends SimpleChannelUpstreamHandler {
        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            super.handleUpstream(ctx, e);
        }

        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
            LOG.info("Channel connected:{}", e.getValue());
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            LOG.info("Receive status:{}", e.getMessage());
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            e.getChannel().close();
        }
    }
}

