/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.blocks.executor.ExecutionService;
import org.jgroups.blocks.executor.ExecutorNotification;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;

@MBean(description="Based class for executor service functionality")
public abstract class Executing
extends Protocol {
    @Property(description="bypasses message bundling if set")
    protected boolean bypass_bundling = true;
    protected Address local_addr;
    protected View view;
    protected final Queue<Runnable> _awaitingConsumer = new ConcurrentLinkedQueue<Runnable>();
    protected final ConcurrentMap<Runnable, Long> _requestId = new ConcurrentHashMap<Runnable, Long>();
    protected final ConcurrentMap<Long, Object> _consumerId = new ConcurrentHashMap<Long, Object>();
    protected final ConcurrentMap<Future<?>, ExecutorNotification> notifiers = new ConcurrentHashMap();
    protected final Map<Runnable, Owner> _running;
    protected final Map<Owner, Runnable> _awaitingReturn;
    protected BlockingQueue<Runnable> _tasks = new SynchronousQueue<Runnable>();
    protected final ConcurrentMap<Runnable, Thread> _runnableThreads = new ConcurrentHashMap<Runnable, Thread>();
    protected Lock _consumerLock = new ReentrantLock();
    protected Queue<Owner> _runRequests = new ArrayDeque<Owner>();
    protected Queue<Owner> _consumersAvailable = new ArrayDeque<Owner>();
    protected static final AtomicLong counter = new AtomicLong();

    public Executing() {
        this._awaitingReturn = Collections.synchronizedMap(new HashMap());
        this._running = Collections.synchronizedMap(new HashMap());
    }

    public boolean getBypassBundling() {
        return this.bypass_bundling;
    }

    public void setBypassBundling(boolean bypass_bundling) {
        this.bypass_bundling = bypass_bundling;
    }

    public void addExecutorListener(Future<?> future, ExecutorNotification listener) {
        if (listener != null) {
            this.notifiers.put(future, listener);
        }
    }

    @ManagedAttribute
    public String getAddress() {
        return this.local_addr != null ? this.local_addr.toString() : null;
    }

    @ManagedAttribute
    public String getView() {
        return this.view != null ? this.view.toString() : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 1024: {
                Runnable runnable = (Runnable)evt.getArg();
                this._awaitingConsumer.add(runnable);
                long requestId = Math.abs(counter.getAndIncrement());
                this._requestId.put(runnable, requestId);
                this.sendToCoordinator(Type.RUN_REQUEST, requestId, this.local_addr);
                break;
            }
            case 1025: {
                Thread currentThread = Thread.currentThread();
                long id = currentThread.getId();
                this._consumerId.put(id, new Object());
                this.sendToCoordinator(Type.CONSUMER_READY, id, this.local_addr);
                try {
                    Runnable runnable = this._tasks.take();
                    this._runnableThreads.put(runnable, currentThread);
                    Runnable runnable2 = runnable;
                    return runnable2;
                }
                catch (InterruptedException e) {
                    this.sendToCoordinator(Type.CONSUMER_UNREADY, id, this.local_addr);
                    Thread.currentThread().interrupt();
                    break;
                }
                finally {
                    this._consumerId.remove(id);
                }
            }
            case 1026: {
                Runnable runnable;
                Object arg = evt.getArg();
                Throwable throwable = null;
                if (arg instanceof Object[]) {
                    Object[] array = (Object[])arg;
                    runnable = (Runnable)array[0];
                    throwable = (Throwable)array[1];
                } else {
                    runnable = (Runnable)arg;
                }
                Owner owner = this._running.remove(runnable);
                this._runnableThreads.remove(runnable);
                Throwable value = null;
                boolean exception = false;
                if (throwable != null) {
                    if (throwable instanceof InterruptedException) {
                        this.sendRequest(owner.address, Type.RUN_REJECTED, owner.requestId, null);
                        break;
                    }
                    value = throwable;
                    exception = true;
                } else if (runnable instanceof RunnableFuture) {
                    RunnableFuture future = (RunnableFuture)runnable;
                    boolean interrupted = false;
                    boolean gotValue = false;
                    while (!gotValue) {
                        try {
                            value = future.get();
                            gotValue = true;
                        }
                        catch (InterruptedException e) {
                            interrupted = true;
                        }
                        catch (ExecutionException e) {
                            value = e.getCause();
                            exception = true;
                            gotValue = true;
                        }
                    }
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (owner != null) {
                    Throwable valueToSend;
                    Type type;
                    if (value == null) {
                        type = Type.RESULT_SUCCESS;
                        valueToSend = value;
                    } else if (value instanceof Serializable || value instanceof Externalizable || value instanceof Streamable) {
                        type = exception ? Type.RESULT_EXCEPTION : Type.RESULT_SUCCESS;
                        valueToSend = value;
                    } else {
                        type = Type.RESULT_EXCEPTION;
                        valueToSend = new NotSerializableException(value.getClass().getName());
                    }
                    if (this.local_addr.equals(owner.getAddress())) {
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("[redirect] <--> [" + this.local_addr + "] " + type.name() + " [" + value + (owner.requestId != -1L ? " request id: " + owner.requestId : "") + "]");
                        }
                        Owner finalOwner = owner;
                        if (type == Type.RESULT_SUCCESS) {
                            this.handleValueResponse(this.local_addr, finalOwner.requestId, valueToSend);
                            break;
                        }
                        if (type != Type.RESULT_EXCEPTION) break;
                        this.handleExceptionResponse(this.local_addr, finalOwner.requestId, valueToSend);
                        break;
                    }
                    this.sendRequest(owner.getAddress(), type, owner.requestId, valueToSend);
                    break;
                }
                if (!this.log.isTraceEnabled()) break;
                this.log.trace("Could not return result - most likely because it was interrupted");
                break;
            }
            case 1027: {
                Object[] array = (Object[])evt.getArg();
                Runnable runnable = (Runnable)array[0];
                if (this._awaitingConsumer.remove(runnable)) {
                    this._requestId.remove(runnable);
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("Cancelled task " + runnable + " before it was picked up");
                    }
                    return Boolean.TRUE;
                }
                if (array[1] == Boolean.TRUE) {
                    Owner owner = Executing.removeKeyForValue(this._awaitingReturn, runnable);
                    if (owner != null) {
                        Long requestIdValue = (Long)this._requestId.remove(runnable);
                        if (requestIdValue != null) {
                            if (requestIdValue.longValue() != owner.getRequestId()) {
                                this.log.warn("Cancelling requestId didn't match waiting");
                            }
                            this.sendRequest(owner.getAddress(), Type.INTERRUPT_RUN, owner.getRequestId(), null);
                        }
                    } else if (this.log.isTraceEnabled()) {
                        this.log.warn("Couldn't interrupt server task: " + runnable);
                    }
                    ExecutorNotification notification = (ExecutorNotification)this.notifiers.remove(runnable);
                    if (notification != null) {
                        notification.interrupted(runnable);
                    }
                    return Boolean.TRUE;
                }
                return Boolean.FALSE;
            }
            case 1028: {
                Object[] array = (Object[])evt.getArg();
                Set runnables = (Set)array[0];
                Boolean booleanValue = (Boolean)array[1];
                ArrayList<Runnable> notRan = new ArrayList<Runnable>();
                for (Runnable cancelRunnable : runnables) {
                    if (!this._awaitingConsumer.remove(cancelRunnable) && booleanValue == Boolean.TRUE) {
                        Map<Owner, Runnable> map = this._awaitingReturn;
                        synchronized (map) {
                            ExecutorNotification notification;
                            Owner owner = Executing.removeKeyForValue(this._awaitingReturn, cancelRunnable);
                            if (owner != null) {
                                Long requestIdValue = (Long)this._requestId.remove(cancelRunnable);
                                if (requestIdValue.longValue() != owner.getRequestId()) {
                                    this.log.warn("Cancelling requestId didn't match waiting");
                                }
                                this.sendRequest(owner.getAddress(), Type.INTERRUPT_RUN, owner.getRequestId(), null);
                            }
                            if ((notification = (ExecutorNotification)this.notifiers.remove(cancelRunnable)) != null) {
                                this.log.trace("Notifying listener");
                                notification.interrupted(cancelRunnable);
                            }
                            continue;
                        }
                    }
                    this._requestId.remove(cancelRunnable);
                    notRan.add(cancelRunnable);
                }
                return notRan;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
                break;
            }
            case 6: {
                this.handleView((View)evt.getArg());
            }
        }
        return this.down_prot.down(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static <V, K> V removeKeyForValue(Map<V, K> map, K value) {
        Map<V, K> map2 = map;
        synchronized (map2) {
            Iterator<Map.Entry<V, K>> iter = map.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<V, K> entry = iter.next();
                if (!entry.getValue().equals(value)) continue;
                iter.remove();
                return entry.getKey();
            }
        }
        return null;
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Message msg = (Message)evt.getArg();
                ExecutorHeader hdr = (ExecutorHeader)msg.getHeader(this.id);
                if (hdr == null) break;
                Request req = (Request)msg.getObject();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("[" + this.local_addr + "] <-- [" + msg.getSrc() + "] " + req);
                }
                switch (req.type) {
                    case RUN_REQUEST: {
                        this.handleTaskRequest(req.request, (Address)req.object);
                        break;
                    }
                    case CONSUMER_READY: {
                        this.handleConsumerReadyRequest(req.request, (Address)req.object);
                        break;
                    }
                    case CONSUMER_UNREADY: {
                        this.handleConsumerUnreadyRequest(req.request, (Address)req.object);
                        break;
                    }
                    case CONSUMER_FOUND: {
                        this.handleConsumerFoundResponse(req.request, (Address)req.object);
                        break;
                    }
                    case RUN_SUBMITTED: {
                        FutureTask runnable;
                        Object objectToRun = req.object;
                        if (objectToRun instanceof Runnable) {
                            runnable = (FutureTask)objectToRun;
                        } else if (objectToRun instanceof Callable) {
                            Callable callable = (Callable)objectToRun;
                            runnable = new FutureTask(callable);
                        } else {
                            this.log.error("Request of type " + (Object)((Object)req.type) + " sent an object of " + objectToRun + " which is invalid");
                            break;
                        }
                        this.handleTaskSubmittedRequest(runnable, msg.getSrc(), req.request);
                        break;
                    }
                    case RUN_REJECTED: {
                        this.handleTaskRejectedResponse(msg.getSrc(), req.request);
                        break;
                    }
                    case RESULT_SUCCESS: {
                        this.handleValueResponse(msg.getSrc(), req.request, req.object);
                        break;
                    }
                    case RESULT_EXCEPTION: {
                        this.handleExceptionResponse(msg.getSrc(), req.request, (Throwable)req.object);
                        break;
                    }
                    case INTERRUPT_RUN: {
                        this.handleInterruptRequest(msg.getSrc(), req.request);
                        break;
                    }
                    case CREATE_CONSUMER_READY: {
                        Owner owner = new Owner((Address)req.object, req.request);
                        this.handleNewConsumer(owner);
                        break;
                    }
                    case CREATE_RUN_REQUEST: {
                        Owner owner = new Owner((Address)req.object, req.request);
                        this.handleNewRunRequest(owner);
                        break;
                    }
                    case DELETE_CONSUMER_READY: {
                        Owner owner = new Owner((Address)req.object, req.request);
                        this.handleRemoveConsumer(owner);
                        break;
                    }
                    case DELETE_RUN_REQUEST: {
                        Owner owner = new Owner((Address)req.object, req.request);
                        this.handleRemoveRunRequest(owner);
                        break;
                    }
                    default: {
                        this.log.error("Request of type " + (Object)((Object)req.type) + " not known");
                    }
                }
                return null;
            }
            case 6: {
                this.handleView((View)evt.getArg());
            }
        }
        return this.up_prot.up(evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleView(View view) {
        this.view = view;
        if (this.log.isDebugEnabled()) {
            this.log.debug("view=" + view);
        }
        Vector<Address> members = view.getMembers();
        this._consumerLock.lock();
        try {
            Owner owner;
            Iterator iterator = this._consumersAvailable.iterator();
            while (iterator.hasNext()) {
                owner = (Owner)iterator.next();
                if (members.contains(owner.getAddress())) continue;
                iterator.remove();
                this.sendRemoveConsumerRequest(owner);
            }
            iterator = this._runRequests.iterator();
            while (iterator.hasNext()) {
                owner = (Owner)iterator.next();
                if (members.contains(owner.getAddress())) continue;
                iterator.remove();
                this.sendRemoveRunRequest(owner);
            }
            for (Map.Entry<Owner, Runnable> entry : this._awaitingReturn.entrySet()) {
                Owner owner2 = entry.getKey();
                if (members.contains(owner2.getAddress())) continue;
                this.sendToCoordinator(Type.RUN_REQUEST, owner2.getRequestId(), owner2.getAddress());
                Runnable runnable = entry.getValue();
                this._requestId.put(runnable, owner2.getRequestId());
                this._awaitingConsumer.add(runnable);
            }
        }
        finally {
            this._consumerLock.unlock();
        }
    }

    protected abstract void sendToCoordinator(Type var1, long var2, Address var4);

    protected abstract void sendNewRunRequest(Owner var1);

    protected abstract void sendRemoveRunRequest(Owner var1);

    protected abstract void sendNewConsumerRequest(Owner var1);

    protected abstract void sendRemoveConsumerRequest(Owner var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleTaskRequest(long requestId, Address address) {
        Owner consumer;
        Owner source = new Owner(address, requestId);
        this._consumerLock.lock();
        try {
            consumer = this._consumersAvailable.poll();
            if (consumer == null && !this._runRequests.contains(source)) {
                this._runRequests.add(source);
            }
        }
        finally {
            this._consumerLock.unlock();
        }
        if (consumer != null) {
            this.sendRequest(source.getAddress(), Type.CONSUMER_FOUND, consumer.getRequestId(), consumer.getAddress());
            this.sendRemoveConsumerRequest(consumer);
        } else {
            this.sendNewRunRequest(source);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleConsumerReadyRequest(long requestId, Address address) {
        Owner requestor;
        Owner source = new Owner(address, requestId);
        this._consumerLock.lock();
        try {
            requestor = this._runRequests.poll();
            if (requestor == null && !this._consumersAvailable.contains(source)) {
                this._consumersAvailable.add(source);
            }
        }
        finally {
            this._consumerLock.unlock();
        }
        if (requestor != null) {
            this.sendRequest(requestor.getAddress(), Type.CONSUMER_FOUND, source.getRequestId(), source.getAddress());
            this.sendRemoveRunRequest(requestor);
        } else {
            this.sendNewConsumerRequest(source);
        }
    }

    protected void handleConsumerUnreadyRequest(long requestId, Address address) {
        Owner consumer = new Owner(address, requestId);
        this._consumersAvailable.remove(consumer);
        this.sendRemoveConsumerRequest(consumer);
    }

    protected void handleConsumerFoundResponse(long request, Address address) {
        Runnable runnable = this._awaitingConsumer.poll();
        Owner owner = new Owner(address, request);
        if (runnable == null) {
            this.sendToCoordinator(Type.CONSUMER_READY, owner.getRequestId(), owner.getAddress());
        } else {
            Long requestId = (Long)this._requestId.get(runnable);
            owner = new Owner(address, requestId);
            this._awaitingReturn.put(owner, runnable);
            if (this.local_addr.equals(owner.getAddress())) {
                this.handleTaskSubmittedRequest(runnable, this.local_addr, requestId);
            } else if (runnable instanceof ExecutionService.DistributedFuture) {
                Callable callable = ((ExecutionService.DistributedFuture)runnable).getCallable();
                this.sendRequest(owner.getAddress(), Type.RUN_SUBMITTED, requestId, callable);
            } else {
                this.sendRequest(owner.getAddress(), Type.RUN_SUBMITTED, requestId, runnable);
            }
        }
    }

    protected void handleTaskSubmittedRequest(Runnable runnable, Address source, long requestId) {
        this._running.put(runnable, new Owner(source, requestId));
        boolean received = false;
        try {
            received = this._tasks.offer(runnable, 1000L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Interrupted while handing off");
        }
        if (!received) {
            this.sendRequest(source, Type.RUN_REJECTED, requestId, null);
            this._running.remove(runnable);
        }
    }

    protected void handleTaskRejectedResponse(Address source, long requestId) {
        Runnable runnable = this._awaitingReturn.remove(new Owner(source, requestId));
        if (runnable != null) {
            this._awaitingConsumer.add(runnable);
            Long taskRequestId = (Long)this._requestId.get(runnable);
            if (taskRequestId != requestId) {
                this.log.warn("Task Request Id doesn't match in rejection");
            }
            this.sendToCoordinator(Type.RUN_REQUEST, taskRequestId, this.local_addr);
        } else {
            this.log.error("error resubmitting task for request-id: " + requestId);
        }
    }

    protected void handleValueResponse(Address source, long requestId, Object value) {
        Runnable runnable = this._awaitingReturn.remove(new Owner(source, requestId));
        if (runnable != null) {
            this._requestId.remove(runnable);
        }
        if (runnable instanceof RunnableFuture) {
            RunnableFuture future = (RunnableFuture)runnable;
            ExecutorNotification notifier = (ExecutorNotification)this.notifiers.remove(future);
            if (notifier != null) {
                notifier.resultReturned(value);
            }
        } else {
            this.log.warn("Runnable was not found in awaiting");
        }
    }

    protected void handleExceptionResponse(Address source, long requestId, Throwable throwable) {
        Runnable runnable = this._awaitingReturn.remove(new Owner(source, requestId));
        if (runnable != null) {
            this._requestId.remove(runnable);
        }
        if (runnable instanceof RunnableFuture) {
            RunnableFuture future = (RunnableFuture)runnable;
            ExecutorNotification notifier = (ExecutorNotification)this.notifiers.remove(future);
            if (notifier != null) {
                notifier.throwableEncountered(throwable);
            }
        } else {
            this.log.error("Runtime Error encountered from Cluster execute(Runnable) method", throwable);
        }
    }

    protected void handleInterruptRequest(Address source, long requestId) {
        Owner owner = new Owner(source, requestId);
        Runnable runnable = Executing.removeKeyForValue(this._running, owner);
        if (runnable != null) {
            Thread thread = (Thread)this._runnableThreads.remove(runnable);
            thread.interrupt();
        } else if (this.log.isTraceEnabled()) {
            this.log.trace("Message could not be interrupted due to it already returned");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleNewRunRequest(Owner sender) {
        this._consumerLock.lock();
        try {
            if (!this._runRequests.contains(sender)) {
                this._runRequests.add(sender);
            }
        }
        finally {
            this._consumerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleRemoveRunRequest(Owner sender) {
        this._consumerLock.lock();
        try {
            this._runRequests.remove(sender);
        }
        finally {
            this._consumerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleNewConsumer(Owner sender) {
        this._consumerLock.lock();
        try {
            if (!this._consumersAvailable.contains(sender)) {
                this._consumersAvailable.add(sender);
            }
        }
        finally {
            this._consumerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleRemoveConsumer(Owner sender) {
        this._consumerLock.lock();
        try {
            this._consumersAvailable.remove(sender);
        }
        finally {
            this._consumerLock.unlock();
        }
    }

    protected void sendRequest(Address dest, Type type, long requestId, Object object) {
        Request req = new Request(type, object, requestId);
        Message msg = new Message(dest, null, req);
        msg.putHeader(this.id, new ExecutorHeader());
        if (this.bypass_bundling) {
            msg.setFlag((byte)2);
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("[" + this.local_addr + "] --> [" + (dest == null ? "ALL" : dest) + "] " + req);
        }
        try {
            this.down_prot.down(new Event(1, msg));
        }
        catch (Exception ex) {
            this.log.error("failed sending " + (Object)((Object)type) + " request: " + ex);
        }
    }

    public static class Owner {
        protected final Address address;
        protected final long requestId;

        public Owner(Address address, long requestId) {
            this.address = address;
            this.requestId = requestId;
        }

        public Address getAddress() {
            return this.address;
        }

        public long getRequestId() {
            return this.requestId;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.address == null ? 0 : this.address.hashCode());
            result = 31 * result + (int)(this.requestId ^ this.requestId >>> 32);
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            Owner other = (Owner)obj;
            if (this.address == null ? other.address != null : !this.address.equals(other.address)) {
                return false;
            }
            return this.requestId == other.requestId;
        }

        public String toString() {
            return this.address + "::" + this.requestId;
        }
    }

    public static class ExecutorHeader
    extends Header {
        @Override
        public int size() {
            return 0;
        }

        @Override
        public void writeTo(DataOutputStream out) throws IOException {
        }

        @Override
        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
        }
    }

    protected static class Request
    implements Streamable {
        protected Type type;
        protected Object object;
        protected long request;

        public Request() {
        }

        public Request(Type type, Object object, long request) {
            this.type = type;
            this.object = object;
            this.request = request;
        }

        @Override
        public void writeTo(DataOutputStream out) throws IOException {
            out.writeByte(this.type.ordinal());
            try {
                if (this.object instanceof Streamable) {
                    out.writeShort(-1);
                    Util.writeGenericStreamable((Streamable)this.object, out);
                } else {
                    byte[] bytes = Util.objectToByteBuffer(this.object);
                    out.writeInt(bytes.length);
                    out.write(bytes);
                }
            }
            catch (IOException e) {
                throw e;
            }
            catch (Exception e) {
                throw new IOException("Exception encountered while serializing execution request", e);
            }
            out.writeLong(this.request);
        }

        @Override
        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
            this.type = Type.values()[in.readByte()];
            try {
                short first = in.readShort();
                if (first == -1) {
                    this.object = Util.readGenericStreamable(in);
                } else {
                    ByteBuffer bb = ByteBuffer.allocate(4);
                    bb.putShort(first);
                    bb.putShort(in.readShort());
                    int size = bb.getInt(0);
                    byte[] bytes = new byte[size];
                    in.readFully(bytes, 0, size);
                    this.object = Util.objectFromByteBuffer(bytes);
                }
            }
            catch (IOException e) {
                throw e;
            }
            catch (Exception e) {
                throw new IOException("Exception encountered while serializing execution request", e);
            }
            this.request = in.readLong();
        }

        public String toString() {
            return this.type.name() + " [" + this.object + (this.request != -1L ? " request id: " + this.request : "") + "]";
        }
    }

    protected static enum Type {
        RUN_REQUEST,
        CONSUMER_READY,
        CONSUMER_UNREADY,
        CONSUMER_FOUND,
        RUN_SUBMITTED,
        RUN_REJECTED,
        RESULT_EXCEPTION,
        RESULT_SUCCESS,
        INTERRUPT_RUN,
        CREATE_RUN_REQUEST,
        CREATE_CONSUMER_READY,
        DELETE_RUN_REQUEST,
        DELETE_CONSUMER_READY;

    }
}

