/*
 * Decompiled with CFR 0.152.
 */
package org.piax.gtrans.ov.ring.rq;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import org.piax.common.Endpoint;
import org.piax.common.Id;
import org.piax.common.ObjectId;
import org.piax.common.TransportId;
import org.piax.common.subspace.CircularRange;
import org.piax.common.subspace.Range;
import org.piax.gtrans.ChannelTransport;
import org.piax.gtrans.IdConflictException;
import org.piax.gtrans.RPCException;
import org.piax.gtrans.RemoteValue;
import org.piax.gtrans.TransOptions;
import org.piax.gtrans.impl.NestedMessage;
import org.piax.gtrans.ov.ddll.DdllKey;
import org.piax.gtrans.ov.ddll.Link;
import org.piax.gtrans.ov.ddll.Node;
import org.piax.gtrans.ov.ddll.NodeManagerIf;
import org.piax.gtrans.ov.ring.NoSuchKeyException;
import org.piax.gtrans.ov.ring.RingIf;
import org.piax.gtrans.ov.ring.RingManager;
import org.piax.gtrans.ov.ring.RingVNode;
import org.piax.gtrans.ov.ring.TemporaryIOException;
import org.piax.gtrans.ov.ring.UnavailableException;
import org.piax.gtrans.ov.ring.rq.DKRangeRValue;
import org.piax.gtrans.ov.ring.rq.MessagePath;
import org.piax.gtrans.ov.ring.rq.QueryId;
import org.piax.gtrans.ov.ring.rq.RQAlgorithm;
import org.piax.gtrans.ov.ring.rq.RQExecQueryCallback;
import org.piax.gtrans.ov.ring.rq.RQIf;
import org.piax.gtrans.ov.ring.rq.RQMessage;
import org.piax.gtrans.ov.ring.rq.RQResults;
import org.piax.gtrans.ov.ring.rq.RQReturn;
import org.piax.gtrans.ov.ring.rq.RQVNode;
import org.piax.gtrans.ov.ring.rq.RangeUtils;
import org.piax.gtrans.ov.ring.rq.SubRange;
import org.piax.util.StrictMap;
import org.piax.util.UniqId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RQManager<E extends Endpoint>
extends RingManager<E>
implements RQIf<E> {
    static final Logger logger = LoggerFactory.getLogger(RQManager.class);
    public static int QID_EXPIRE = 120000;
    public static int QID_EXPIRATION_TASK_PERIOD = 20000;
    public static int RQ_NRECENT = 10;
    public static int RQ_FLUSH_PERIOD = 2000;
    public static int RQ_EXPIRATION_GRACE = 5000;
    public static int RQ_RETRANS_PERIOD = 10000;
    public static final String QUERY_INSERT_POINT_SPECIAL = "*InsertPointSpecial*";
    public static final String QUERY_KEY_SPECIAL = "*QueryKeySpecial*";
    public static final ObjectId RQ_QUERY_AT_FIND = new ObjectId("*QueryAtFind*");
    public static int FIND_INSERT_POINT_TIMEOUT = 30000;
    protected static final UniqId FIXPEERID = UniqId.PLUS_INFINITY;
    public final Link FIXLEFT;
    public static final boolean NEWALGORITHM = true;
    protected final RQExecQueryCallback execQueryCallback;
    private RQAlgorithm stdRQAlgo;

    public RQManager(TransportId transId, ChannelTransport<E> trans, RQExecQueryCallback execQueryCallback) throws IdConflictException, IOException {
        super(transId, trans);
        this.execQueryCallback = execQueryCallback;
        this.FIXLEFT = new Link(this.myLocator, new DdllKey(Integer.valueOf(0), FIXPEERID));
        this.schedule(new PurgeTask(), (long)(Math.random() * (double)QID_EXPIRATION_TASK_PERIOD), (long)QID_EXPIRATION_TASK_PERIOD);
    }

    protected boolean preferDelegateNodeLeftSide() {
        return true;
    }

    @Override
    public RQVNode<E> getVNode(Comparable<?> rawkey) {
        return (RQVNode)this.keyHash.get(rawkey);
    }

    protected void setRQAlgorithm(RQAlgorithm algo) {
        this.stdRQAlgo = algo;
    }

    @Override
    public RQIf<E> getStub(E addr, int rpcTimeout) {
        return (RQIf)super.getStub((Endpoint)addr, rpcTimeout);
    }

    @Override
    public RQIf<E> getStub(Endpoint dst) {
        return (RQIf)super.getStub(dst);
    }

    @Override
    public Node.InsertPoint findImmedNeighbors(E introducer, DdllKey key, Object query, TransOptions opts) throws UnavailableException, IOException {
        NavigableMap<DdllKey, Link> links;
        logger.debug("introducer={}, key={}", introducer, (Object)key);
        if (introducer == null) {
            this.rtLockR();
            links = this.getAvailableLinks();
            this.rtUnlockR();
            if (links.size() == 0) {
                return null;
            }
        } else {
            Node.InsertPoint ip;
            RingIf stub = this.getStub((Endpoint)introducer);
            try {
                ip = stub.findImmedNeighbors(null, key, query, opts);
            }
            catch (RPCException e) {
                logger.debug("", (Throwable)e);
                if (e.getCause() instanceof IOException) {
                    throw (IOException)e.getCause();
                }
                throw new IOException(e.getCause());
            }
            return ip;
        }
        SubRange range = new SubRange(key, true, key, true);
        TransOptions newOpts = opts == null ? new TransOptions((long)FIND_INSERT_POINT_TIMEOUT, TransOptions.ResponseType.DIRECT) : opts;
        long timeout = newOpts.getTimeout();
        RQReturn rqRet = this.rqStartKeyRange(Collections.singleton(range), query == null ? QUERY_INSERT_POINT_SPECIAL : query, newOpts, RQ_RETRANS_PERIOD, links, this.stdRQAlgo);
        logger.debug("find: waiting {}", (Object)rqRet);
        try {
            Collection<RemoteValue<?>> col = rqRet.get(timeout);
            logger.debug("find: col = {}", col);
        }
        catch (InterruptedException e) {
            throw new IOException("range query timeout");
        }
        logger.debug("rqRet = {}", (Object)rqRet);
        for (DKRangeRValue kr : rqRet.rvals.values()) {
            if (kr.getRemoteValue().getOption() == null) continue;
            Node.InsertPoint insp = (Node.InsertPoint)kr.getRemoteValue().getOption();
            logger.debug("find: insert point = {}, {}", (Object)insp, (Object)kr);
            return insp;
        }
        if (TransOptions.responseType(opts) == TransOptions.ResponseType.NO_RESPONSE) {
            return null;
        }
        throw new TemporaryIOException("could not find insert point @" + this.getEndpoint() + " for " + key);
    }

    public RQResults scalableRangeQueryPro(Collection<? extends Range<?>> ranges, Object query, TransOptions opts) {
        if (ranges.size() == 0) {
            return new RQResults();
        }
        RQReturn rqRet = this.rqStartRawRange(ranges, query, opts, RQ_RETRANS_PERIOD, null, this.stdRQAlgo);
        return rqRet.results;
    }

    protected RQReturn rqStartRawRange(Collection<? extends Range<?>> ranges, Object query, TransOptions opts, int retransPeriod, NavigableMap<DdllKey, Link> allLinks, RQAlgorithm rqAlgo) {
        ArrayList<SubRange> subRanges = new ArrayList<SubRange>();
        for (Range<?> range : ranges) {
            SubRange keyRange = RQManager.convertToSubRange(range);
            keyRange.assignId();
            subRanges.add(keyRange);
        }
        return this.rqStartKeyRange(subRanges, query, opts, retransPeriod, allLinks, rqAlgo);
    }

    public static SubRange convertToSubRange(Range<? extends Comparable<?>> range) {
        SubRange keyRange = new SubRange(new DdllKey((Comparable<?>)range.from, range.fromInclusive ? UniqId.MINUS_INFINITY : UniqId.PLUS_INFINITY), true, new DdllKey((Comparable<?>)range.to, range.toInclusive ? UniqId.PLUS_INFINITY : UniqId.MINUS_INFINITY), false);
        return keyRange;
    }

    private RQReturn rqStartKeyRange(Collection<SubRange> ranges, Object query, TransOptions opts, int retransPeriod, NavigableMap<DdllKey, Link> allLinks, RQAlgorithm rqAlgo) {
        QueryId qid = new QueryId(this.peerId, this.rand.nextLong());
        if (opts == null) {
            opts = new TransOptions();
        }
        RQMessage msg = rqAlgo.newRQMessage4Root(this.msgframe, ranges, qid, query, opts);
        this.rqDisseminate(msg, allLinks);
        return msg.rqRet;
    }

    public void rqDisseminate(RQMessage msg) {
        this.rqDisseminate(msg, null);
    }

    public void rqDisseminate(RQMessage msg, NavigableMap<DdllKey, Link> allLinks) {
        this.rtLockW();
        try {
            this.rqDisseminate0(msg, allLinks);
        }
        finally {
            this.rtUnlockW();
        }
    }

    private void rqDisseminate0(RQMessage msg, NavigableMap<DdllKey, Link> allLinks) {
        String h = "rqDiss(id=" + msg.msgId + ")";
        logger.debug("{}: msg = {}", (Object)h, (Object)msg);
        if (allLinks == null) {
            allLinks = this.getAvailableLinks();
        }
        if (allLinks.isEmpty()) {
            logger.warn("routing table is empty!: {}", (Object)this);
            return;
        }
        StrictMap<Id, ArrayList<SubRange>> map = new StrictMap<Id, ArrayList<SubRange>>(new HashMap());
        ArrayList rvals = new ArrayList();
        for (SubRange subRange : msg.subRanges) {
            List<SubRange> subsubRanges = this.rqSplit(msg.query, subRange, allLinks, msg.failedLinks, rvals, msg.getRangeQueryAlgorithm());
            if (subsubRanges == null) continue;
            logger.debug("subsubRanges = {}", subsubRanges);
            for (SubRange subRange2 : subsubRanges) {
                UniqId pid = subRange2.getLink() == this.FIXLEFT ? FIXPEERID : subRange2.getLink().key.getUniqId();
                ArrayList<SubRange> list = (ArrayList<SubRange>)map.get(pid);
                if (list == null) {
                    list = new ArrayList<SubRange>();
                    map.put(pid, list);
                }
                list.add(subRange2);
            }
        }
        logger.debug("{}: aggregated: {}", (Object)h, map);
        logger.debug("{}: msg = {}", (Object)h, (Object)msg.toString());
        if (msg.rqRet == null) {
            msg.rqRet = new RQReturn(this, msg, msg.opts, msg.isRoot);
        }
        RQReturn rqRet = msg.rqRet;
        if (TransOptions.inspect(msg.opts)) {
            rqRet.updateHops(msg.hops);
        }
        HashSet<MessagePath> paths = new HashSet<MessagePath>();
        List<CircularRange<DdllKey>> failedRanges = new ArrayList<CircularRange<DdllKey>>();
        for (Map.Entry entry : map.entrySet()) {
            Id p = (Id)entry.getKey();
            if (p.equals(FIXPEERID)) {
                failedRanges.addAll((Collection)entry.getValue());
                continue;
            }
            if (p.equals(this.peerId)) continue;
            List subRanges = (List)entry.getValue();
            logger.debug("{}: forward {}, {}", new Object[]{h, p, subRanges});
            RQMessage m = msg.newChildInstance(subRanges);
            Link l = ((SubRange)subRanges.get(0)).getLink();
            rqRet.sendChildMessage(l, m);
            if (!TransOptions.inspect(msg.opts)) continue;
            DdllKey from = ((RingVNode)this.keyHash.firstEntry().getValue()).getKey();
            MessagePath mp = new MessagePath(msg.hops + 1, from, l.key, subRanges);
            logger.debug("mp={}", (Object)mp);
            paths.add(mp);
        }
        failedRanges = RangeUtils.concatAdjacentRanges(failedRanges);
        logger.debug(h + ": merged failedRanges = " + failedRanges);
        if (!msg.failedLinks.isEmpty()) {
            // empty if block
        }
        msg.getRangeQueryAlgorithm().rqExecuteLocal(msg, (List)map.get(this.peerId), rvals);
        logger.debug("rqDisseminate: rvals = {}", rvals);
        if (TransOptions.inspect(msg.opts)) {
            rqRet.addMessagePaths(paths);
        }
        rqRet.addRemoteValues(rvals);
        TransOptions.ResponseType rtype = TransOptions.responseType(msg.opts);
        if (rtype == TransOptions.ResponseType.DIRECT && !msg.isRoot || rtype == TransOptions.ResponseType.NO_RESPONSE) {
            rqRet.flush();
            rqRet.dispose();
        }
        logger.debug("rqDisseminate finished");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected List<SubRange> rqSplit(Object query, SubRange range0, NavigableMap<DdllKey, Link> allLinks, Collection<Endpoint> failedLinks, List<DKRangeRValue<?>> rvals, RQAlgorithm rqAlgo) {
        String h = "rqSplit";
        if (failedLinks == null) {
            failedLinks = Collections.emptySet();
        }
        Iterator<Endpoint> fi = failedLinks.iterator();
        while (fi.hasNext()) {
            Endpoint f = fi.next();
            if (!this.getEndpoint().equals(f)) continue;
            fi.remove();
        }
        SubRange range = range0;
        this.rtLockR();
        try {
            for (RingVNode node : this.keyHash.values()) {
                Range<DdllKey> removed;
                if (node.getMode() != RingVNode.VNodeMode.INSERTED) continue;
                Link right = node.getSuccessor();
                logger.debug("{}, node = {}, range = {}, right = {}", new Object[]{h, node, range, right});
                assert (right != null);
                if (range.contains(node.getKey()) || (removed = RangeUtils.removedRange(range, node.getKey(), right.key)) == null) continue;
                RemoteValue rv = new RemoteValue(this.peerId);
                Node.InsertPoint insp = new Node.InsertPoint(node.getLocalLink(), right);
                rv.setOption(insp);
                DKRangeRValue kr = new DKRangeRValue(rv, removed);
                logger.debug("{}: dummy rval {}", (Object)h, kr);
                rvals.add(kr);
                SubRange[] ranges = range.retainRanges(node.getKey(), right.key);
                logger.debug("{}: retain = {}", (Object)h, (Object)ranges);
                if (ranges == null) {
                    List<SubRange> list = null;
                    return list;
                }
                range = ranges[0];
            }
        }
        finally {
            this.rtUnlockR();
        }
        if (range0 != range) {
            logger.debug("{}: shrunk, {} => {}", new Object[]{h, range0, range});
        }
        List<SubRange> ranges = rqAlgo.assignDelegate(query, range, allLinks, failedLinks);
        return ranges;
    }

    public List<RemoteValue<?>> forwardQuery(boolean isPlusDir, Range<?> range, int maxNum, Object query, TransOptions opts) {
        logger.trace("ENTRY:");
        logger.debug("isPlusdir:{}, range:{}, num:{}", new Object[]{isPlusDir, range, maxNum});
        try {
            if (!isPlusDir) {
                List<RemoteValue<?>> list = this.forwardQueryLeft(range, maxNum, query, opts, false);
                return list;
            }
            throw new UnsupportedOperationException("upper is not supported");
        }
        finally {
            logger.trace("EXIT:");
        }
    }

    private List<RemoteValue<?>> forwardQueryLeft(Range<?> range, int num, Object query, TransOptions opts, boolean wrapAround) {
        Object rawFromKey = range.to;
        DdllKey fromKey = range.toInclusive ? new DdllKey((Comparable<?>)rawFromKey, UniqId.PLUS_INFINITY) : new DdllKey((Comparable<?>)rawFromKey, UniqId.MINUS_INFINITY);
        Object rawToKey = range.from;
        DdllKey toKey = range.fromInclusive ? new DdllKey((Comparable<?>)rawToKey, UniqId.MINUS_INFINITY) : new DdllKey((Comparable<?>)rawToKey, UniqId.PLUS_INFINITY);
        ArrayList rset = new ArrayList();
        QueryId qid = new QueryId(this.peerId, this.rand.nextLong());
        LinkedList<Link> trace = new LinkedList<Link>();
        boolean getStartNode = true;
        Link n = null;
        Link nRight = null;
        while (true) {
            RingManager.ExecQueryReturn eqr;
            if (getStartNode) {
                try {
                    logger.debug("num={}, opts={}, query={}", new Object[]{num, opts, query});
                    Node.InsertPoint links = this.find(null, fromKey, true, num == 1 && TransOptions.responseType(opts) == TransOptions.ResponseType.NO_RESPONSE ? new NestedMessage(RQ_QUERY_AT_FIND, RQ_QUERY_AT_FIND, this.peerId, this.myLocator, query) : null, opts);
                    if (links == null) {
                        return null;
                    }
                    n = links.left;
                    nRight = links.right;
                }
                catch (UnavailableException e) {
                    logger.error("", (Throwable)e);
                    return null;
                }
                catch (IOException e) {
                    logger.error("", (Throwable)e);
                    return null;
                }
                getStartNode = false;
            }
            if (!new Range<DdllKey>(toKey, fromKey).contains(n.key)) {
                logger.debug("forwardQueryLeft: finish (reached end of range) [{} {}], {}", new Object[]{toKey, fromKey, n.key});
                break;
            }
            if (!wrapAround && keyComp.compare((Comparable<?>)rawFromKey, n.key.getPrimaryKey()) < 0) {
                logger.debug("forwardQueryLeft: finish (no node is smaller than rawFromKey)");
                break;
            }
            logger.debug("forwardQueryLeft: query to " + n);
            RingIf stub = this.getStub(n.addr);
            boolean doAction = true;
            try {
                eqr = stub.invokeExecQuery(n.key.getPrimaryKey(), nRight, qid, doAction, query, opts);
            }
            catch (RingManager.RightNodeMismatch e) {
                logger.debug("", (Throwable)e);
                if (keyComp.isOrdered(n.key, e.curRight.key, nRight.key)) {
                    n = e.curRight;
                    logger.debug("forwardQueryLeft: right node mismatch. restart from {}", (Object)n);
                    continue;
                }
                nRight = e.curRight;
                logger.debug("forwardQueryLeft: right node mismatch. continue from {}", (Object)n);
                continue;
            }
            catch (Throwable e) {
                Throwable cause;
                assert (e instanceof UnavailableException || e instanceof RPCException);
                Throwable throwable = cause = e instanceof RPCException ? e.getCause() : e;
                if (!(cause instanceof UnavailableException) && !(cause instanceof InterruptedIOException)) {
                    logger.error("forwardQueryLeft: got {} when calling invokeExecQuery() on ", (Object)cause, (Object)n);
                    break;
                }
                logger.debug("", cause);
                if (trace.size() == 0) {
                    logger.debug("forwardQueryLeft: start over");
                    getStartNode = true;
                    continue;
                }
                Link next = (Link)trace.removeLast();
                logger.debug("forwardQueryLeft: retry from {}", (Object)next);
                NodeManagerIf stub2 = (NodeManagerIf)this.manager.getStub(next.addr);
                try {
                    stub2.startFix(next.key, n, false);
                }
                catch (Exception e2) {
                    logger.info("", (Throwable)e2);
                }
                try {
                    Thread.sleep(Node.GETSTAT_OP_TIMEOUT + 100);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                n = next;
                nRight = null;
                continue;
            }
            if (doAction && eqr.key != null) {
                rset.add(eqr.key);
            }
            if (rset.size() >= num) {
                logger.debug("forwardQueryLeft: got enough");
                break;
            }
            if (Node.isOrdered(eqr.left.key, fromKey, n.key)) {
                logger.debug("forwardQueryLeft: circulated");
                break;
            }
            if (n.key.equals(eqr.left.key) && n.key.equals(eqr.right.key)) {
                logger.debug("forwardQueryLeft: just single node exists");
                break;
            }
            if (n.equals(eqr.left)) continue;
            trace.addLast(n);
            nRight = n;
            n = eqr.left;
            if (trace.size() > RQ_NRECENT) {
                trace.removeFirst();
            }
            logger.debug("trace= {}", trace);
        }
        return rset;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RingManager.ExecQueryReturn invokeExecQuery(Comparable<?> rawkey, Link curRight, QueryId qid, boolean doAction, Object query, TransOptions opts) throws NoSuchKeyException, RingManager.RightNodeMismatch {
        RingVNode snode;
        logger.trace("ENTRY:");
        logger.debug("invokeExecQuery: rawkey={}, curRight={}, qid={}, doAction={}", new Object[]{rawkey, curRight, qid, doAction});
        RingManager.ExecQueryReturn eqr = new RingManager.ExecQueryReturn();
        this.rtLockR();
        try {
            snode = this.getVNode((Comparable)rawkey);
            if (snode == null) {
                throw new NoSuchKeyException(rawkey + ", " + this.keyHash);
            }
            Node node = snode.getDdllNode();
            node.lock();
            if (node.getMode() == Node.Mode.GRACE || node.getMode() == Node.Mode.OUT) {
                node.unlock();
                throw new NoSuchKeyException(rawkey + " has been deleted");
            }
            eqr.right = node.getRight();
            eqr.left = node.getLeft();
            node.unlock();
            if (curRight != null && !curRight.equals(eqr.right)) {
                throw new RingManager.RightNodeMismatch(eqr.right);
            }
            if (snode.getMode() == RingVNode.VNodeMode.DELETING) {
                logger.debug("invokeExecQuery: ignore deleting node {}", (Object)snode);
                RingManager.ExecQueryReturn execQueryReturn = eqr;
                return execQueryReturn;
            }
            if (!doAction) {
                RingManager.ExecQueryReturn execQueryReturn = eqr;
                return execQueryReturn;
            }
        }
        finally {
            this.rtUnlockR();
        }
        RemoteValue<?> r = null;
        if (TransOptions.deliveryMode(opts) == TransOptions.DeliveryMode.ACCEPT_ONCE && (r = ((RQVNode)snode).store.get(qid)) != null) {
            eqr.key = r;
            logger.debug("invokeExecQuery returns {}", (Object)eqr);
            return eqr;
        }
        RemoteValue<?> rval = this.execQuery(rawkey, query);
        if (TransOptions.deliveryMode(opts) == TransOptions.DeliveryMode.ACCEPT_ONCE) {
            ((RQVNode)snode).store.put(qid, rval);
        }
        eqr.key = rval;
        logger.debug("invokeExecQuery returns {}", (Object)eqr);
        return eqr;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RemoteValue<?> execQuery(Comparable<?> key, Object query) {
        logger.trace("ENTRY:");
        try {
            logger.debug("execQuery: key={}, query={}", key, query);
            if (this.execQueryCallback == null) {
                RemoteValue remoteValue = new RemoteValue((Endpoint)this.peerId, key);
                return remoteValue;
            }
            RemoteValue<?> remoteValue = this.execQueryCallback.rqExecQuery(key, query);
            return remoteValue;
        }
        finally {
            logger.trace("EXIT:");
        }
    }

    class PurgeTask
    implements Runnable {
        PurgeTask() {
        }

        @Override
        public void run() {
            RQManager.this.rtLockR();
            for (RingVNode _snode : RQManager.this.keyHash.values()) {
                RQVNode snode = (RQVNode)_snode;
                snode.store.removeExpired(QID_EXPIRE);
            }
            RQManager.this.rtUnlockR();
        }
    }
}

