/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.ov.sg;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.piax.common.Endpoint;
import org.piax.gtrans.RPCException;
import org.piax.gtrans.ov.ddll.Link;
import org.piax.gtrans.ov.sg.SkipGraph;
import org.piax.gtrans.ov.sg.SkipGraphIf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SGMessagingFramework<E extends Endpoint> {
    private static final Logger logger = LoggerFactory.getLogger(SGMessagingFramework.class);
    public static final int DUMMY_MSGID = 0;
    public static int ACK_TIMEOUT_THRES = 19900;
    public static int ACK_TIMEOUT_TIMER = ACK_TIMEOUT_THRES + 100;
    private final Map<Integer, SGRequestMessage<E>> msgStore = new ConcurrentHashMap<Integer, SGRequestMessage<E>>();
    private AtomicInteger nextId = new AtomicInteger(0);
    E myLocator;
    private final SkipGraph<E> sg;
    public static int MSGSTORE_EXPIRATION_TASK_PERIOD = 60000;
    TimerTask cleaner;

    public SGMessagingFramework(SkipGraph<E> sg) {
        this.sg = sg;
        this.myLocator = sg.myLocator;
        this.cleaner = new TimerTask(){

            @Override
            public void run() {
                SGMessagingFramework.this.removeExpiredFromMsgStore();
            }
        };
        sg.timer.schedule(this.cleaner, MSGSTORE_EXPIRATION_TASK_PERIOD, (long)MSGSTORE_EXPIRATION_TASK_PERIOD);
    }

    private void removeExpiredFromMsgStore() {
        long now = System.currentTimeMillis();
        Iterator<Integer> it = this.msgStore.keySet().iterator();
        while (it.hasNext()) {
            int msgId = it.next();
            SGRequestMessage<E> entry = this.msgStore.get(msgId);
            if (entry.timestamp >= now - (long)entry.expire) continue;
            logger.debug("removing expired: {}", (Object)msgId);
            it.remove();
        }
    }

    public void fin() {
        this.cleaner.cancel();
    }

    int getNextMsgId() {
        return this.nextId.incrementAndGet();
    }

    private void send(E dst, SGRequestMessage<E> msg, boolean waitForAck) {
        this.prepareReceivingReply(msg, waitForAck, true);
        logger.debug("* request {} ===> {}: {}", new Object[]{this.myLocator, dst, msg});
        try {
            SkipGraphIf stub = (SkipGraphIf)this.sg.getStub(dst);
            stub.requestMsgReceived(msg);
        }
        catch (RPCException e) {
            logger.info("", (Throwable)e);
        }
    }

    private void prepareReceivingReply(SGRequestMessage<E> msg, boolean expectAck) {
        this.prepareReceivingReply(msg, expectAck, false);
    }

    private void prepareReceivingReply(final SGRequestMessage<E> msg, boolean expectAck, final boolean callOnTimeout) {
        if (msg.timeoutTask != null) {
            throw new Error("sent twice: " + this);
        }
        assert (msg.msgId != 0);
        this.msgStore.put(msg.msgId, msg);
        logger.trace("prepareReceivingReply: {}: {}", (Object)this.sg.toStringShort(), msg);
        if (!expectAck) {
            assert (!callOnTimeout);
            return;
        }
        msg.timeoutTask = new TimerTask(){

            @Override
            public void run() {
                logger.debug("timeout, waiting ack for {}", (Object)msg);
                SGMessagingFramework.this.msgStore.remove(msg.msgId);
                logger.trace("unregister by timeout: {}: {}", (Object)SGMessagingFramework.this.sg.toStringShort(), (Object)msg);
                if (callOnTimeout) {
                    msg.onTimeOut(SGMessagingFramework.this.sg);
                }
            }
        };
        this.sg.timer.schedule(msg.timeoutTask, ACK_TIMEOUT_TIMER);
    }

    public void requestMsgReceived(SGRequestMessage<E> req) {
        req.sgmf = this;
        req.isRecvdInstance = true;
        logger.debug("* ack {} ===> {}: msgId = {}", new Object[]{this.myLocator, req.sender, req.msgId});
        try {
            SkipGraphIf stub = (SkipGraphIf)this.sg.getStub(req.sender);
            stub.ackReceived(req.msgId);
        }
        catch (RPCException e) {
            logger.info("", (Throwable)e);
        }
        logger.debug("execute, msgId = {}", (Object)req.msgId);
        req.execute(this.sg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ackReceived(int msgId) {
        String h = "ackReceived(" + this.myLocator + ")";
        SGRequestMessage<E> req = this.msgStore.get(msgId);
        if (req == null) {
            logger.info("{}: no corresponding request (maybe ok): msgId = {}", (Object)h, (Object)msgId);
        } else {
            SGRequestMessage<E> sGRequestMessage = req;
            synchronized (sGRequestMessage) {
                logger.debug("{}: acked: {}", (Object)h, req);
                if (req.timeoutTask != null) {
                    req.timeoutTask.cancel();
                }
                req.ackReceived = true;
                if (req.isDirectReturn) {
                    this.msgStore.remove(msgId);
                    logger.trace("unregister by Ack: {}: {}", (Object)this.sg.toStringShort(), req);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void replyMsgReceived(SGReplyMessage<E> repl) {
        String h = "replyMsgReceived(" + this.myLocator + ")";
        SGRequestMessage<E> req = this.msgStore.get(repl.replyId);
        if (req == null) {
            logger.info("{}: no corresponding request (maybe ok): replyId={}", (Object)h, (Object)repl.replyId);
        } else {
            assert (req.mayReceiveReply());
            req.sgmf = this.sg.sgmf;
            SGRequestMessage<E> sGRequestMessage = req;
            synchronized (sGRequestMessage) {
                logger.debug("{}: receive reply, replyId={}", (Object)h, (Object)repl.replyId);
                if (req.timeoutTask != null) {
                    req.timeoutTask.cancel();
                }
                boolean removeOK = req.onReceivingReply(this.sg, repl);
                logger.debug("{}: removeOK = {}", (Object)h, (Object)removeOK);
                if (removeOK) {
                    this.msgStore.remove(repl.replyId);
                    logger.trace("unregister by reply: {}: {}", (Object)this.sg.toStringShort(), req);
                    logger.debug("{}: msgStore = {}", (Object)h, this.msgStore);
                }
            }
        }
    }

    public static abstract class SGReplyMessage<E extends Endpoint>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final Logger logger = LoggerFactory.getLogger(SGReplyMessage.class);
        final E sender;
        final int replyId;
        final transient SkipGraph<E> sg;
        final transient E receiver;

        public SGReplyMessage(SkipGraph<E> sg, SGRequestMessage<E> received) {
            this.sender = sg.myLocator;
            this.receiver = received.replyTo;
            this.sg = sg;
            this.replyId = received.replyId;
            logger.debug("SGReplyMessage: replyId = " + this.replyId);
        }

        public synchronized void reply() {
            logger.debug("* reply {} ===> {}: replyId = {}", new Object[]{this.sender, this.receiver, this.replyId});
            try {
                SkipGraphIf stub = (SkipGraphIf)this.sg.getStub(this.receiver);
                stub.replyMsgReceived(this);
            }
            catch (RPCException e) {
                logger.info("", (Throwable)e);
            }
        }
    }

    public static abstract class SGRequestMessage<E extends Endpoint>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        final E sender;
        Link receiver;
        final int msgId;
        final E replyTo;
        final int replyId;
        final boolean isDirectReturn;
        final transient boolean isRoot;
        final int expire;
        transient TimerTask timeoutTask;
        transient SGMessagingFramework<E> sgmf;
        transient long timestamp;
        transient boolean ackReceived = false;
        transient boolean isRecvdInstance = false;

        public SGRequestMessage(SGMessagingFramework<E> sgmf, boolean isRoot, boolean isDirectReturn, E replyTo, int replyId, int expire) {
            assert (isDirectReturn == (replyTo != null));
            this.sgmf = sgmf;
            this.sender = sgmf.myLocator;
            this.isRoot = isRoot;
            this.isDirectReturn = isDirectReturn;
            this.msgId = sgmf.getNextMsgId();
            this.expire = expire;
            if (isDirectReturn) {
                assert (replyTo != null);
                this.replyTo = replyTo;
                if (isRoot) {
                    assert (replyId == 0);
                    this.replyId = this.msgId;
                } else {
                    assert (replyId != 0);
                    this.replyId = replyId;
                }
            } else {
                assert (replyTo == null);
                assert (replyId == 0);
                this.replyTo = this.sender;
                this.replyId = this.msgId;
            }
            if (isRoot && isDirectReturn) {
                ((SGMessagingFramework)sgmf).prepareReceivingReply(this, false);
            }
        }

        public SGRequestMessage(SGRequestMessage<E> msg) {
            this.sgmf = msg.sgmf;
            this.sender = this.sgmf.myLocator;
            this.isRoot = msg.isRoot;
            this.isDirectReturn = msg.isDirectReturn;
            this.msgId = this.sgmf.getNextMsgId();
            this.replyTo = msg.replyTo;
            this.replyId = msg.replyId;
            this.expire = msg.expire;
            if (this.isRoot && this.isDirectReturn) {
                ((SGMessagingFramework)this.sgmf).prepareReceivingReply(this, false);
            }
        }

        public void send(Link dst) {
            assert (!this.isRecvdInstance);
            this.receiver = dst;
            this.timestamp = System.currentTimeMillis();
            ((SGMessagingFramework)this.sgmf).send(dst.addr, this, true);
        }

        public boolean isAckTimedOut() {
            if (this.ackReceived) {
                return false;
            }
            return this.timestamp != 0L && System.currentTimeMillis() - this.timestamp > (long)ACK_TIMEOUT_THRES;
        }

        public void prepareReceivingReply() {
            assert (!this.isRecvdInstance);
            if (this.mayReceiveReply()) {
                logger.debug("prepareForReply");
                ((SGMessagingFramework)this.sgmf).prepareReceivingReply(this, false);
            }
        }

        public boolean mayReceiveReply() {
            return !this.isDirectReturn || this.isRoot;
        }

        public abstract void execute(SkipGraph<E> var1);

        public abstract boolean onReceivingReply(SkipGraph<E> var1, SGReplyMessage<E> var2);

        public abstract void onTimeOut(SkipGraph<E> var1);
    }
}

