/*
 * Decompiled with CFR 0.152.
 */
package ml.shifu.guagua.master;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import ml.shifu.guagua.BasicCoordinator;
import ml.shifu.guagua.GuaguaRuntimeException;
import ml.shifu.guagua.io.Bytable;
import ml.shifu.guagua.io.BytableWrapper;
import ml.shifu.guagua.io.Combinable;
import ml.shifu.guagua.io.HaltBytable;
import ml.shifu.guagua.io.NettyBytableDecoder;
import ml.shifu.guagua.io.NettyBytableEncoder;
import ml.shifu.guagua.io.Serializer;
import ml.shifu.guagua.master.AbstractMasterCoordinator;
import ml.shifu.guagua.master.MasterContext;
import ml.shifu.guagua.util.AppendList;
import ml.shifu.guagua.util.BytableDiskList;
import ml.shifu.guagua.util.BytableMemoryDiskList;
import ml.shifu.guagua.util.ClassUtils;
import ml.shifu.guagua.util.NetworkUtils;
import ml.shifu.guagua.util.NumberFormatUtils;
import ml.shifu.guagua.util.ReflectionUtils;
import ml.shifu.guagua.util.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyMasterCoordinator<MASTER_RESULT extends Bytable, WORKER_RESULT extends Bytable>
extends AbstractMasterCoordinator<MASTER_RESULT, WORKER_RESULT> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyMasterCoordinator.class);
    private static final Object LOCK = new Object();
    private ServerBootstrap messageServer;
    private int messageServerPort;
    private AppendList<WorkerResultWrapper> iterResults;
    private Map<String, Integer> indexMap = new HashMap<String, Integer>();
    private int currentInteration;
    private int totalInteration;
    private boolean canUpdateWorkerResultMap = true;
    private MASTER_RESULT masterResult = null;
    private String workerClassName = null;
    private static Serializer<Bytable> serializer;
    private ExecutorService cleanOldZkDataThreadPool;

    @Override
    protected void initialize(Properties props) {
        super.initialize(props);
        this.initIterResults(props);
    }

    private boolean isWorkerCombinable(String workerClassName) {
        try {
            return workerClassName != null && Combinable.class.isAssignableFrom(Class.forName(workerClassName));
        }
        catch (ClassNotFoundException e) {
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initIterResults(Properties props) {
        Object object = LOCK;
        synchronized (object) {
            boolean nonSpill = "true".equalsIgnoreCase(props.getProperty("guagua.master.result.nonspill", "true"));
            if (nonSpill && this.isWorkerCombinable(props.getProperty("guagua.worker.result.class"))) {
                int mergeThreshold = NumberFormatUtils.getInt(props.getProperty("guagua.master.result.merge.threshold", "10"), 10);
                this.iterResults = new MergeWorkerResultList(mergeThreshold);
            } else {
                BytableDiskList bytableDiskList = new BytableDiskList(System.currentTimeMillis() + "", WorkerResultWrapper.class.getName());
                double memoryFraction = Double.valueOf(props.getProperty("guagua.master.workeresults.memoryFraction", "0.7"));
                long memoryStoreSize = (long)((double)Runtime.getRuntime().maxMemory() * memoryFraction);
                LOG.info("Memory size in BytableMemoryDiskList for worker result list: {}", (Object)memoryStoreSize);
                this.iterResults = new BytableMemoryDiskList<WorkerResultWrapper>(memoryStoreSize, bytableDiskList);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void preApplication(final MasterContext<MASTER_RESULT, WORKER_RESULT> context) {
        this.initialize(context.getProps());
        this.cleanOldZkDataThreadPool = Executors.newFixedThreadPool(3);
        serializer = this.getWorkerSerializer();
        this.workerClassName = context.getWorkerResultClassName();
        this.totalInteration = context.getTotalIteration();
        new AbstractMasterCoordinator.FailOverCommand(context).execute();
        if (!context.isInitIteration()) {
            new BasicCoordinator.BasicCoordinatorCommand(){

                @Override
                public void doExecute() throws KeeperException, InterruptedException {
                    String appId = context.getAppId();
                    int lastIteration = context.getCurrentIteration();
                    String appMasterNode = NettyMasterCoordinator.this.getCurrentMasterNode(appId, lastIteration).toString();
                    String appMasterSplitNode = NettyMasterCoordinator.this.getCurrentMasterSplitNode(appId, lastIteration).toString();
                    NettyMasterCoordinator.this.setMasterResult(context, appMasterNode, appMasterSplitNode);
                }
            }.execute();
        }
        this.startNettyServer(context.getProps());
        Object object = LOCK;
        synchronized (object) {
            this.currentInteration = context.isInitIteration() ? 0 : context.getCurrentIteration();
        }
        this.initMasterZnode(context);
        if (!context.isInitIteration()) {
            return;
        }
        this.clear(context.getProps());
        LOG.info("All workers are initiliazed successfully.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clear(Properties props) {
        Object object = LOCK;
        synchronized (object) {
            this.closeIterResults();
            this.iterResults.clear();
            this.initIterResults(props);
            this.indexMap.clear();
            this.canUpdateWorkerResultMap = true;
        }
    }

    private void closeIterResults() {
        try {
            Method closeMethod = ClassUtils.getDeclaredMethod("close", this.iterResults.getClass());
            if (closeMethod != null) {
                closeMethod.invoke(this.iterResults, (Object[])null);
            }
        }
        catch (NoSuchMethodException closeMethod) {
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initMasterZnode(MasterContext<MASTER_RESULT, WORKER_RESULT> context) {
        String znode = null;
        try {
            znode = this.getMasterBaseNode(context.getAppId()).toString();
            this.getZooKeeper().createExt(znode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
        }
        catch (KeeperException.NodeExistsException e) {
            LOG.warn("Node exists: {}", (Object)znode);
        }
        catch (Exception e) {
            throw new GuaguaRuntimeException(e);
        }
        try {
            znode = this.getCurrentMasterNode(context.getAppId(), 0).toString();
            if (this.getZooKeeper().exists(znode, false) == null) {
                String znodeValue = InetAddress.getLocalHost().getHostName() + ":" + this.messageServerPort + ":" + 1;
                this.getZooKeeper().createExt(znode, znodeValue.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
                LOG.info("Master znode initialization with server info {}", (Object)znodeValue);
            } else {
                String existZnodeValue = new String(this.getZooKeeper().getData(znode, null, null), Charset.forName("UTF-8"));
                int version = NumberFormatUtils.getInt(existZnodeValue.split(":")[2], true);
                String znodeValue = InetAddress.getLocalHost().getHostName() + ":" + this.messageServerPort + ":" + (version + 1);
                this.getZooKeeper().createOrSetExt(znode, znodeValue.getBytes(Charset.forName("UTF-8")), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false, -1);
                LOG.info("Master znode re-initialization with server info {}", (Object)znodeValue);
            }
        }
        catch (KeeperException.NodeExistsException e) {
            LOG.warn("Node exists: {}", (Object)znode);
        }
        catch (Exception e) {
            throw new GuaguaRuntimeException(e);
        }
    }

    private void startNettyServer(Properties props) {
        this.messageServerPort = NumberFormatUtils.getInt(props.getProperty("guagua.netty.sever.port"), 44323);
        this.messageServerPort = NetworkUtils.getValidServerPort(this.messageServerPort);
        this.messageServer = new ServerBootstrap((ChannelFactory)new NioServerSocketChannelFactory((Executor)Executors.newFixedThreadPool(8, new MasterThreadFactory()), (Executor)Executors.newCachedThreadPool(new MasterThreadFactory())));
        this.messageServer.setPipelineFactory(new ChannelPipelineFactory(){

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline((ChannelHandler[])new ChannelHandler[]{new NettyBytableEncoder(), new NettyBytableDecoder(), new ServerHandler()});
            }
        });
        try {
            this.messageServer.bind((SocketAddress)new InetSocketAddress(this.messageServerPort));
        }
        catch (ChannelException e) {
            LOG.warn(e.getMessage() + "; try to rebind again.");
            this.messageServerPort = NetworkUtils.getValidServerPort(this.messageServerPort);
            this.messageServer.bind((SocketAddress)new InetSocketAddress(this.messageServerPort));
        }
        try {
            LOG.info("Master netty server is started at {}", (Object)(InetAddress.getLocalHost().getHostName() + ":" + InetAddress.getLocalHost().getHostAddress() + ":" + this.messageServerPort));
        }
        catch (UnknownHostException e) {
            throw new GuaguaRuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void preIteration(final MasterContext<MASTER_RESULT, WORKER_RESULT> context) {
        Object object = LOCK;
        synchronized (object) {
            this.currentInteration = context.getCurrentIteration();
            this.canUpdateWorkerResultMap = true;
        }
        long start = System.nanoTime();
        new BasicCoordinator.RetryCoordinatorCommand(this.isFixedTime(), this.getSleepTime()){
            private int nextIndex;
            {
                super(isFixedTime, sleepUnitTime);
                this.nextIndex = 0;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean retryExecution() throws KeeperException, InterruptedException {
                int doneWorkers;
                Object object = LOCK;
                synchronized (object) {
                    doneWorkers = NettyMasterCoordinator.this.iterResults.size();
                }
                int[] dumpArray = new int[]{context.getWorkers() / 4, context.getWorkers() * 2 / 4, context.getWorkers() * 3 / 4, context.getWorkers()};
                for (int i = this.nextIndex; i < dumpArray.length; ++i) {
                    if (doneWorkers < dumpArray[i]) continue;
                    this.nextIndex = i + 1;
                    LOG.info("Iteration {}, workers compelted: {}, still {} workers are not synced (fixed).", new Object[]{context.getCurrentIteration(), doneWorkers, context.getWorkers() - doneWorkers});
                }
                long timeOut = 0L;
                timeOut = context.isFirstIteration() || context.getCurrentIteration() == context.getTotalIteration() ? 60000L : context.getMinWorkersTimeOut();
                boolean isTerminated = this.isTerminated(doneWorkers, context.getWorkers(), context.getMinWorkersRatio(), timeOut);
                if (isTerminated) {
                    Object object2 = LOCK;
                    synchronized (object2) {
                        NettyMasterCoordinator.this.canUpdateWorkerResultMap = false;
                    }
                    LOG.info("Iteration {}, master waiting is terminated by workers {} doneWorkers {} minWorkersRatio {} minWorkersTimeOut {}.", new Object[]{context.getCurrentIteration(), context.getWorkers(), doneWorkers, context.getMinWorkersRatio(), 60000L});
                }
                return isTerminated;
            }
        }.execute();
        Object object2 = LOCK;
        synchronized (object2) {
            this.canUpdateWorkerResultMap = false;
            this.iterResults.switchState();
        }
        if (this.iterResults instanceof BytableMemoryDiskList) {
            LOG.info("Worker result memory count in iteration {} is {}.", (Object)this.currentInteration, (Object)((BytableMemoryDiskList)this.iterResults).getMemoryCount());
            LOG.info("Worker result dist count in iteration {} is {}.", (Object)this.currentInteration, (Object)((BytableMemoryDiskList)this.iterResults).getDiskCount());
        } else {
            LOG.info("Worker result merge list raw and merged count in iteration {} are {}, {}.", new Object[]{this.currentInteration, ((MergeWorkerResultList)this.iterResults).size(), ((MergeWorkerResultList)this.iterResults).mergedSize()});
        }
        final int currentIter = this.currentInteration;
        context.setWorkerResults(new Iterable<WORKER_RESULT>(){

            @Override
            public Iterator<WORKER_RESULT> iterator() {
                return new Iterator<WORKER_RESULT>(){
                    private Iterator<WorkerResultWrapper> localItr;
                    private volatile AtomicBoolean isStart = new AtomicBoolean();
                    WorkerResultWrapper current = null;

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public boolean hasNext() {
                        boolean hasNext;
                        Object object = LOCK;
                        synchronized (object) {
                            if (this.isStart.compareAndSet(false, true)) {
                                this.localItr = NettyMasterCoordinator.this.iterResults.iterator();
                            }
                            if (hasNext = this.localItr.hasNext()) {
                                this.current = this.localItr.next();
                                while (this.current.currIter != currentIter && (hasNext = this.localItr.hasNext())) {
                                    this.current = this.localItr.next();
                                }
                            }
                            if (!hasNext) {
                                this.localItr = NettyMasterCoordinator.this.iterResults.iterator();
                                return false;
                            }
                        }
                        return hasNext;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public WORKER_RESULT next() {
                        Object object = LOCK;
                        synchronized (object) {
                            return this.current.workerResult;
                        }
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        });
        LOG.info("Application {} container {} iteration {} waiting ends with {}ms execution time.", new Object[]{context.getAppId(), context.getContainerId(), context.getCurrentIteration(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)});
    }

    @Override
    public void postIteration(final MasterContext<MASTER_RESULT, WORKER_RESULT> context) {
        context.setWorkerResults(null);
        new BasicCoordinator.BasicCoordinatorCommand(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void doExecute() throws KeeperException, InterruptedException {
                NettyMasterCoordinator.this.masterResult = context.getMasterResult();
                NettyMasterCoordinator.this.updateMasterHaltStatus(context);
                boolean isSplit = false;
                String appCurrentMasterNode = NettyMasterCoordinator.this.getCurrentMasterNode(context.getAppId(), context.getCurrentIteration()).toString();
                String appCurrentMasterSplitNode = NettyMasterCoordinator.this.getCurrentMasterSplitNode(context.getAppId(), context.getCurrentIteration()).toString();
                LOG.debug("master result:{}", context.getMasterResult());
                long start = System.nanoTime();
                try {
                    byte[] bytes = NettyMasterCoordinator.this.getMasterSerializer().objectToBytes(context.getMasterResult());
                    isSplit = NettyMasterCoordinator.this.setBytesToZNode(appCurrentMasterNode, appCurrentMasterSplitNode, bytes, CreateMode.PERSISTENT);
                    Object object = LOCK;
                    synchronized (object) {
                        NettyMasterCoordinator.this.clear(context.getProps());
                        NettyMasterCoordinator.this.currentInteration = context.getCurrentIteration() + 1;
                        NettyMasterCoordinator.this.canUpdateWorkerResultMap = true;
                    }
                }
                catch (KeeperException.NodeExistsException e) {
                    LOG.warn("Has such node:", (Throwable)e);
                }
                LOG.info("set results to zookeeper with time {}ms", (Object)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                final int resultCleanUpInterval = NumberFormatUtils.getInt(context.getProps().getProperty("guagua.cleanup.interval"), 2);
                if (context.getCurrentIteration() >= resultCleanUpInterval + 1) {
                    final boolean isLocalSplit = isSplit;
                    NettyMasterCoordinator.this.cleanOldZkDataThreadPool.submit(new Runnable(){

                        @Override
                        public void run() {
                            String znode = NettyMasterCoordinator.this.getMasterNode(context.getAppId(), context.getCurrentIteration() - resultCleanUpInterval).toString();
                            try {
                                NettyMasterCoordinator.this.getZooKeeper().deleteExt(znode, -1, false);
                                if (isLocalSplit) {
                                    znode = NettyMasterCoordinator.this.getCurrentMasterSplitNode(context.getAppId(), context.getCurrentIteration() - resultCleanUpInterval).toString();
                                    NettyMasterCoordinator.this.getZooKeeper().deleteExt(znode, -1, true);
                                }
                            }
                            catch (KeeperException.NoNodeException e) {
                                if (System.nanoTime() % 20L == 0L) {
                                    LOG.warn("No such node:{}", (Object)znode);
                                }
                            }
                            catch (KeeperException keeperException) {
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                        }
                    });
                }
                LOG.debug("master results write to znode.");
            }
        }.execute();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void postApplication(final MasterContext<MASTER_RESULT, WORKER_RESULT> context) {
        Object object = LOCK;
        synchronized (object) {
            this.currentInteration = context.getCurrentIteration();
        }
        new BasicCoordinator.BasicCoordinatorCommand(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void doExecute() throws Exception, InterruptedException {
                try {
                    String zkCleanUpEnabled = StringUtils.get(context.getProps().getProperty("guagua.zk.cleanup.enable"), "true");
                    final int masterUnregisterTimeout = NumberFormatUtils.getInt(context.getProps().getProperty("guagua.master.unregister.wait.timeout", "200000"));
                    LOG.info("guagua master un register timeout is {}", (Object)masterUnregisterTimeout);
                    final long start = System.nanoTime();
                    if (Boolean.TRUE.toString().equalsIgnoreCase(zkCleanUpEnabled)) {
                        new BasicCoordinator.RetryCoordinatorCommand(NettyMasterCoordinator.this.isFixedTime(), NettyMasterCoordinator.this.getSleepTime()){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public boolean retryExecution() throws KeeperException, InterruptedException {
                                int doneWorkers;
                                Object object = LOCK;
                                synchronized (object) {
                                    doneWorkers = NettyMasterCoordinator.this.iterResults.size();
                                }
                                if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) > (long)masterUnregisterTimeout) {
                                    LOG.info("unregister step, worker(s) compelted: {}, still {} workers are not unregistered, but time out to terminate.", (Object)doneWorkers, (Object)(context.getWorkers() - doneWorkers));
                                    return true;
                                }
                                if (System.nanoTime() % 30L == 0L) {
                                    LOG.info("unregister step, worker(s) compelted: {}, still {} workers are not unregistered.", (Object)doneWorkers, (Object)(context.getWorkers() - doneWorkers));
                                }
                                return this.isTerminated(doneWorkers, context.getWorkers(), 1.0, 60000L);
                            }
                        }.execute();
                        String appNode = NettyMasterCoordinator.this.getAppNode(context.getAppId()).toString();
                        try {
                            NettyMasterCoordinator.this.getZooKeeper().deleteExt(appNode, -1, true);
                        }
                        catch (KeeperException.NoNodeException e) {
                            if (System.nanoTime() % 20L == 0L) {
                                LOG.warn("No such node:{}", (Object)appNode);
                            }
                        }
                    }
                }
                finally {
                    if (NettyMasterCoordinator.this.messageServer != null) {
                        Method shutDownMethod = ReflectionUtils.getMethod(NettyMasterCoordinator.this.messageServer.getClass(), "shutdown");
                        if (shutDownMethod != null) {
                            shutDownMethod.invoke((Object)NettyMasterCoordinator.this.messageServer, (Object[])null);
                        }
                        NettyMasterCoordinator.this.messageServer.releaseExternalResources();
                    }
                    NettyMasterCoordinator.super.close();
                    NettyMasterCoordinator.this.closeIterResults();
                    NettyMasterCoordinator.this.iterResults.clear();
                }
            }
        }.execute();
        this.cleanOldZkDataThreadPool.shutdownNow();
        try {
            this.cleanOldZkDataThreadPool.awaitTermination(2L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private static class WorkerResultWrapper
    implements Bytable,
    Combinable<WorkerResultWrapper> {
        private int currIter;
        private Bytable workerResult;
        private String className;

        public WorkerResultWrapper(int currIter, Bytable workerResult, String className) {
            this.currIter = currIter;
            this.workerResult = workerResult;
            this.className = className;
        }

        public boolean isWorkerCombinable() {
            try {
                return this.className != null && this.workerResult != null && Combinable.class.isAssignableFrom(Class.forName(this.className));
            }
            catch (ClassNotFoundException e) {
                return false;
            }
        }

        @Override
        public WorkerResultWrapper combine(WorkerResultWrapper wrw) {
            if (this.isWorkerCombinable()) {
                Combinable cwr = (Combinable)((Object)this.workerResult);
                cwr.combine(wrw.workerResult);
            }
            return this;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(this.currIter);
            if (this.className == null) {
                out.writeInt(0);
            } else {
                this.writeBytes(out, this.className.getBytes(Charset.forName("UTF-8")));
            }
            if (this.workerResult == null) {
                out.writeInt(0);
            } else {
                byte[] wrBytes = serializer.objectToBytes(this.workerResult);
                this.writeBytes(out, wrBytes);
            }
        }

        private void writeBytes(DataOutput out, byte[] bytes) throws IOException {
            out.writeInt(bytes.length);
            for (int i = 0; i < bytes.length; ++i) {
                out.writeByte(bytes[i]);
            }
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.currIter = in.readInt();
            int classNameLen = in.readInt();
            if (classNameLen != 0) {
                byte[] classNameBytes = new byte[classNameLen];
                for (int i = 0; i < classNameBytes.length; ++i) {
                    classNameBytes[i] = in.readByte();
                }
                this.className = new String(classNameBytes, Charset.forName("UTF-8"));
            } else {
                this.className = null;
            }
            int bytesSize = in.readInt();
            if (bytesSize != 0) {
                byte[] wrBytes = new byte[bytesSize];
                for (int i = 0; i < wrBytes.length; ++i) {
                    wrBytes[i] = in.readByte();
                }
                this.workerResult = serializer.bytesToObject(wrBytes, this.className);
            }
        }
    }

    private class ServerHandler
    extends SimpleChannelUpstreamHandler {
        private ServerHandler() {
        }

        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            if (e instanceof ChannelStateEvent && ((ChannelStateEvent)e).getState() != ChannelState.INTEREST_OPS) {
                LOG.debug(e.toString());
            }
            super.handleUpstream(ctx, e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            if (!(e.getMessage() instanceof Bytable)) {
                throw new IllegalStateException("Message should be bytable instance.");
            }
            BytableWrapper bytableWrapper = (BytableWrapper)e.getMessage();
            LOG.debug("Received container id {} with message:{}", (Object)bytableWrapper.getContainerId(), (Object)bytableWrapper);
            LOG.debug("Received message size {}", (Object)(bytableWrapper != null && bytableWrapper.getBytes() != null ? bytableWrapper.getBytes().length : 0));
            String containerId = bytableWrapper.getContainerId();
            Object object = LOCK;
            synchronized (object) {
                if (!NettyMasterCoordinator.this.canUpdateWorkerResultMap) {
                    LOG.info("Cannot update worker result with message: containerId {} iteration {} currentIteration", new Object[]{containerId, bytableWrapper.getCurrentIteration(), NettyMasterCoordinator.this.currentInteration});
                    bytableWrapper.setBytes(null);
                    return;
                }
                if (bytableWrapper.isStopMessage()) {
                    Bytable masterResult = NettyMasterCoordinator.this.masterResult;
                    if ((bytableWrapper.getCurrentIteration() == NettyMasterCoordinator.this.totalInteration + 1 || masterResult instanceof HaltBytable && ((HaltBytable)masterResult).isHalt()) && !NettyMasterCoordinator.this.indexMap.containsKey(containerId)) {
                        WorkerResultWrapper wrw = new WorkerResultWrapper(bytableWrapper.getCurrentIteration(), null, null);
                        NettyMasterCoordinator.this.iterResults.append(wrw);
                        NettyMasterCoordinator.this.indexMap.put(containerId, NettyMasterCoordinator.this.iterResults.size() - 1);
                    }
                } else if (!NettyMasterCoordinator.this.indexMap.containsKey(containerId) && NettyMasterCoordinator.this.currentInteration == bytableWrapper.getCurrentIteration()) {
                    String clazzName = NettyMasterCoordinator.this.workerClassName;
                    Object wr = NettyMasterCoordinator.this.getWorkerSerializer().bytesToObject(bytableWrapper.getBytes(), clazzName);
                    bytableWrapper.setBytes(null);
                    WorkerResultWrapper wrw = new WorkerResultWrapper(bytableWrapper.getCurrentIteration(), (Bytable)wr, clazzName);
                    NettyMasterCoordinator.this.iterResults.append(wrw);
                    NettyMasterCoordinator.this.indexMap.put(containerId, NettyMasterCoordinator.this.iterResults.size() - 1);
                }
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
            LOG.error("error in service handler", e.getCause());
            e.getChannel().close();
            Throwable cause = e.getCause();
            if (cause != null && cause instanceof GuaguaRuntimeException) {
                throw (GuaguaRuntimeException)cause;
            }
            throw new GuaguaRuntimeException(e.getCause());
        }
    }

    private static class MasterThreadFactory
    implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MasterThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            this.group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    LOG.warn("Error message in thread {} with error message {}, error root cause {}.", new Object[]{t, e, e.getCause()});
                }
            });
            return thread;
        }
    }

    private class MergeWorkerResultList
    extends LinkedList<WorkerResultWrapper>
    implements AppendList<WorkerResultWrapper> {
        private static final long serialVersionUID = -33662960498334446L;
        private int rawSize;
        private final int threshold;
        private int currIndex = 0;

        public MergeWorkerResultList(int threshold) {
            if (threshold <= 0) {
                throw new IllegalArgumentException("Threshold cannot be <= 0.");
            }
            this.threshold = threshold;
        }

        @Override
        public synchronized boolean add(WorkerResultWrapper e) {
            ++this.rawSize;
            if (e.isWorkerCombinable()) {
                ++this.currIndex;
                if (this.currIndex == this.threshold - 1) {
                    while (this.currIndex > 1) {
                        e.combine((WorkerResultWrapper)this.removeLast());
                        --this.currIndex;
                    }
                    if (this.currIndex != 1) {
                        throw new IllegalStateException();
                    }
                    return super.add(e);
                }
                return super.add(e);
            }
            return super.add(e);
        }

        @Override
        public synchronized int size() {
            return this.rawSize;
        }

        public synchronized int mergedSize() {
            return super.size();
        }

        @Override
        public synchronized boolean append(WorkerResultWrapper t) {
            return this.add(t);
        }

        @Override
        public synchronized void switchState() {
        }

        public synchronized void close() {
        }
    }
}

