/*
 * Decompiled with CFR 0.152.
 */
package udt;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import udt.UDPEndPoint;
import udt.UDTInputStream;
import udt.UDTPacket;
import udt.UDTSender;
import udt.UDTSession;
import udt.packets.Acknowledgement;
import udt.packets.Acknowledgment2;
import udt.packets.ControlPacket;
import udt.packets.DataPacket;
import udt.packets.KeepAlive;
import udt.packets.NegativeAcknowledgement;
import udt.packets.Shutdown;
import udt.receiver.AckHistoryEntry;
import udt.receiver.AckHistoryWindow;
import udt.receiver.PacketHistoryWindow;
import udt.receiver.PacketPairWindow;
import udt.receiver.ReceiverLossList;
import udt.receiver.ReceiverLossListEntry;
import udt.util.MeanValue;
import udt.util.UDTStatistics;
import udt.util.UDTThreadFactory;
import udt.util.Util;

public class UDTReceiver {
    private static final Logger logger = Logger.getLogger(UDTReceiver.class.getName());
    private final UDPEndPoint endpoint;
    private final UDTSession session;
    private final UDTStatistics statistics;
    private final ReceiverLossList receiverLossList;
    private final AckHistoryWindow ackHistoryWindow;
    private final PacketHistoryWindow packetHistoryWindow;
    private volatile long lastDataPacketArrivalTime = 0L;
    private volatile long largestReceivedSeqNumber = 0L;
    private long lastAckNumber = 0L;
    private volatile long largestAcknowledgedAckNumber = -1L;
    private volatile long expCount = 0L;
    private final PacketPairWindow packetPairWindow;
    long estimateLinkCapacity;
    long packetArrivalSpeed;
    long roundTripTime = 0L;
    long roundTripTimeVar = this.roundTripTime / 2L;
    private long nextACK;
    private long ackTimerInterval = Util.getSYNTime();
    private long nextNAK;
    private long nakTimerInterval = Util.getSYNTime();
    private long nextEXP;
    private long expTimerInterval = 100L * Util.getSYNTime();
    private final long sessionUpSince;
    private final long IDLE_TIMEOUT = 180000L;
    private final long bufferSize;
    private final BlockingQueue<UDTPacket> handoffQueue;
    private Thread receiverThread;
    private volatile boolean stopped = false;
    private volatile long ackInterval = -1L;
    public static boolean connectionExpiryDisabled = false;
    private final boolean storeStatistics;
    private MeanValue dgReceiveInterval;
    private MeanValue dataPacketInterval;
    private MeanValue processTime;
    private MeanValue dataProcessTime;
    public static int dropRate = 0;
    private int n = 0;
    private volatile long ackSequenceNumber = 0L;

    public UDTReceiver(UDTSession session, UDPEndPoint endpoint) {
        this.endpoint = endpoint;
        this.session = session;
        this.sessionUpSince = System.currentTimeMillis();
        this.statistics = session.getStatistics();
        if (!session.isReady()) {
            throw new IllegalStateException("UDTSession is not ready.");
        }
        this.ackHistoryWindow = new AckHistoryWindow(16);
        this.packetHistoryWindow = new PacketHistoryWindow(16);
        this.receiverLossList = new ReceiverLossList();
        this.packetPairWindow = new PacketPairWindow(16);
        this.largestReceivedSeqNumber = session.getInitialSequenceNumber() - 1L;
        this.bufferSize = session.getReceiveBufferSize();
        this.handoffQueue = new ArrayBlockingQueue<UDTPacket>(4 * session.getFlowWindowSize());
        this.storeStatistics = Boolean.getBoolean("udt.receiver.storeStatistics");
        this.initMetrics();
        this.start();
    }

    private void initMetrics() {
        if (!this.storeStatistics) {
            return;
        }
        this.dgReceiveInterval = new MeanValue("UDT receive interval");
        this.statistics.addMetric(this.dgReceiveInterval);
        this.dataPacketInterval = new MeanValue("Data packet interval");
        this.statistics.addMetric(this.dataPacketInterval);
        this.processTime = new MeanValue("UDT packet process time");
        this.statistics.addMetric(this.processTime);
        this.dataProcessTime = new MeanValue("Data packet process time");
        this.statistics.addMetric(this.dataProcessTime);
    }

    private void start() {
        Runnable r = new Runnable(){

            @Override
            public void run() {
                try {
                    UDTReceiver.this.nextACK = Util.getCurrentTime() + UDTReceiver.this.ackTimerInterval;
                    UDTReceiver.this.nextNAK = (long)((double)Util.getCurrentTime() + 1.5 * (double)UDTReceiver.this.nakTimerInterval);
                    UDTReceiver.this.nextEXP = Util.getCurrentTime() + 2L * UDTReceiver.this.expTimerInterval;
                    UDTReceiver.this.ackInterval = UDTReceiver.this.session.getCongestionControl().getAckInterval();
                    while (!UDTReceiver.this.stopped) {
                        UDTReceiver.this.receiverAlgorithm();
                    }
                }
                catch (Exception ex) {
                    logger.log(Level.SEVERE, "", ex);
                }
                logger.info("STOPPING RECEIVER for " + UDTReceiver.this.session);
            }
        };
        this.receiverThread = UDTThreadFactory.get().newThread(r);
        this.receiverThread.start();
    }

    protected void receive(UDTPacket p) throws IOException {
        if (this.storeStatistics) {
            this.dgReceiveInterval.end();
        }
        this.handoffQueue.offer(p);
        if (this.storeStatistics) {
            this.dgReceiveInterval.begin();
        }
    }

    public void receiverAlgorithm() throws InterruptedException, IOException {
        UDTPacket packet;
        long currentTime = Util.getCurrentTime();
        if (this.nextACK < currentTime) {
            this.nextACK = currentTime + this.ackTimerInterval;
            this.processACKEvent(true);
        }
        if (this.nextNAK < currentTime) {
            this.nextNAK = currentTime + this.nakTimerInterval;
            this.processNAKEvent();
        }
        if (this.nextEXP < currentTime) {
            this.nextEXP = currentTime + this.expTimerInterval;
            this.processEXPEvent();
        }
        if ((packet = this.handoffQueue.poll(Util.getSYNTime(), TimeUnit.MICROSECONDS)) != null) {
            ControlPacket cp;
            int cpType;
            this.expCount = 1L;
            boolean needEXPReset = false;
            if (packet.isControlPacket() && ((cpType = (cp = (ControlPacket)packet).getControlPacketType()) == ControlPacket.ControlPacketType.ACK.ordinal() || cpType == ControlPacket.ControlPacketType.NAK.ordinal())) {
                needEXPReset = true;
            }
            if (needEXPReset) {
                this.nextEXP = Util.getCurrentTime() + this.expTimerInterval;
            }
            if (this.storeStatistics) {
                this.processTime.begin();
            }
            this.processUDTPacket(packet);
            if (this.storeStatistics) {
                this.processTime.end();
            }
        }
        Thread.yield();
    }

    protected void processACKEvent(boolean isTriggeredByTimer) throws IOException {
        long ackSeqNumber;
        ReceiverLossListEntry entry = this.receiverLossList.getFirstEntry();
        long ackNumber = entry == null ? this.largestReceivedSeqNumber + 1L : entry.getSequenceNumber();
        if (ackNumber == this.largestAcknowledgedAckNumber) {
            return;
        }
        if (ackNumber == this.lastAckNumber) {
            long timeOfLastSentAck = this.ackHistoryWindow.getTime(this.lastAckNumber);
            if (Util.getCurrentTime() - timeOfLastSentAck < 2L * this.roundTripTime) {
                return;
            }
        }
        if (!isTriggeredByTimer) {
            ackSeqNumber = this.sendLightAcknowledgment(ackNumber);
            return;
        }
        ackSeqNumber = this.sendAcknowledgment(ackNumber);
        AckHistoryEntry sentAckNumber = new AckHistoryEntry(ackSeqNumber, ackNumber, Util.getCurrentTime());
        this.ackHistoryWindow.add(sentAckNumber);
        this.lastAckNumber = ackNumber;
    }

    protected void processNAKEvent() throws IOException {
        List<Long> seqNumbers = this.receiverLossList.getFilteredSequenceNumbers(this.roundTripTime, true);
        this.sendNAK(seqNumbers);
    }

    protected void processEXPEvent() throws IOException {
        if (this.session.getSocket() == null) {
            return;
        }
        UDTSender sender = this.session.getSocket().getSender();
        sender.putUnacknowledgedPacketsIntoLossList();
        if (this.expCount > 16L && System.currentTimeMillis() - this.sessionUpSince > 180000L && !connectionExpiryDisabled && !this.stopped) {
            this.sendShutdown();
            this.stop();
            logger.info("Session " + this.session + " expired.");
            return;
        }
        if (!sender.haveLostPackets()) {
            this.sendKeepAlive();
        }
        ++this.expCount;
    }

    protected void processUDTPacket(UDTPacket p) throws IOException {
        if (!p.isControlPacket()) {
            DataPacket dp = (DataPacket)p;
            if (this.storeStatistics) {
                this.dataPacketInterval.end();
                this.dataProcessTime.begin();
            }
            this.onDataPacketReceived(dp);
            if (this.storeStatistics) {
                this.dataProcessTime.end();
                this.dataPacketInterval.begin();
            }
        } else if (p.getControlPacketType() == ControlPacket.ControlPacketType.ACK2.ordinal()) {
            Acknowledgment2 ack2 = (Acknowledgment2)p;
            this.onAck2PacketReceived(ack2);
        } else if (p instanceof Shutdown) {
            logger.info("Received shutdown packet!!");
            this.onShutdown();
        }
    }

    protected void onDataPacketReceived(DataPacket dp) throws IOException {
        long currentSequenceNumber = dp.getPacketSequenceNumber();
        ++this.n;
        long currentDataPacketArrivalTime = Util.getCurrentTime();
        if (currentSequenceNumber % 16L == 1L && this.lastDataPacketArrivalTime > 0L) {
            long interval = currentDataPacketArrivalTime - this.lastDataPacketArrivalTime;
            this.packetPairWindow.add(interval);
        }
        this.packetHistoryWindow.add(currentDataPacketArrivalTime);
        this.lastDataPacketArrivalTime = currentDataPacketArrivalTime;
        ((UDTInputStream)this.session.getSocket().getInputStream()).haveNewData(currentSequenceNumber, dp.getData());
        if (currentSequenceNumber > this.largestReceivedSeqNumber + 1L) {
            this.sendNAK(currentSequenceNumber);
        } else if (currentSequenceNumber < this.largestReceivedSeqNumber) {
            this.receiverLossList.remove(currentSequenceNumber);
        }
        this.statistics.incNumberOfReceivedDataPackets();
        if (currentSequenceNumber > this.largestReceivedSeqNumber) {
            this.largestReceivedSeqNumber = currentSequenceNumber;
        }
        if (this.ackInterval > 0L && (long)this.n % this.ackInterval == 0L) {
            this.processACKEvent(false);
        }
    }

    protected void sendNAK(long currentSequenceNumber) throws IOException {
        NegativeAcknowledgement nAckPacket = new NegativeAcknowledgement();
        nAckPacket.addLossInfo(this.largestReceivedSeqNumber + 1L, currentSequenceNumber);
        nAckPacket.setSession(this.session);
        nAckPacket.setDestinationID(this.session.getDestination().getSocketID());
        for (long i = this.largestReceivedSeqNumber + 1L; i < currentSequenceNumber; ++i) {
            ReceiverLossListEntry detectedLossSeqNumber = new ReceiverLossListEntry(i);
            this.receiverLossList.insert(detectedLossSeqNumber);
        }
        this.endpoint.doSend(nAckPacket);
        this.statistics.incNumberOfNAKSent();
    }

    protected void sendNAK(List<Long> sequenceNumbers) throws IOException {
        if (sequenceNumbers.size() == 0) {
            return;
        }
        NegativeAcknowledgement nAckPacket = new NegativeAcknowledgement();
        nAckPacket.addLossInfo(sequenceNumbers);
        nAckPacket.setSession(this.session);
        nAckPacket.setDestinationID(this.session.getDestination().getSocketID());
        this.endpoint.doSend(nAckPacket);
        this.statistics.incNumberOfNAKSent();
    }

    protected long sendLightAcknowledgment(long ackNumber) throws IOException {
        Acknowledgement acknowledgmentPkt = this.buildLightAcknowledgement(ackNumber);
        this.endpoint.doSend(acknowledgmentPkt);
        this.statistics.incNumberOfACKSent();
        return acknowledgmentPkt.getAckSequenceNumber();
    }

    protected long sendAcknowledgment(long ackNumber) throws IOException {
        Acknowledgement acknowledgmentPkt = this.buildLightAcknowledgement(ackNumber);
        this.estimateLinkCapacity = this.packetPairWindow.getEstimatedLinkCapacity();
        acknowledgmentPkt.setEstimatedLinkCapacity(this.estimateLinkCapacity);
        this.packetArrivalSpeed = this.packetHistoryWindow.getPacketArrivalSpeed();
        acknowledgmentPkt.setPacketReceiveRate(this.packetArrivalSpeed);
        this.endpoint.doSend(acknowledgmentPkt);
        this.statistics.incNumberOfACKSent();
        this.statistics.setPacketArrivalRate(this.packetArrivalSpeed, this.estimateLinkCapacity);
        return acknowledgmentPkt.getAckSequenceNumber();
    }

    private Acknowledgement buildLightAcknowledgement(long ackNumber) {
        Acknowledgement acknowledgmentPkt = new Acknowledgement();
        acknowledgmentPkt.setNexttoPrevPktSeqNO(ackNumber);
        acknowledgmentPkt.setAckSequenceNumber(++this.ackSequenceNumber);
        acknowledgmentPkt.setRoundTripTime(this.roundTripTime);
        acknowledgmentPkt.setRoundTripTimeVar(this.roundTripTimeVar);
        acknowledgmentPkt.setBufferSize(this.bufferSize);
        acknowledgmentPkt.setDestinationID(this.session.getDestination().getSocketID());
        acknowledgmentPkt.setSession(this.session);
        return acknowledgmentPkt;
    }

    protected void onAck2PacketReceived(Acknowledgment2 ack2) {
        AckHistoryEntry entry = this.ackHistoryWindow.getEntry(ack2.getAckSequenceNumber());
        if (entry != null) {
            long ackNumber = entry.getAckNumber();
            this.largestAcknowledgedAckNumber = Math.max(ackNumber, this.largestAcknowledgedAckNumber);
            long rtt = entry.getAge();
            this.roundTripTime = this.roundTripTime > 0L ? (this.roundTripTime * 7L + rtt) / 8L : rtt;
            this.roundTripTimeVar = (this.roundTripTimeVar * 3L + Math.abs(this.roundTripTimeVar - rtt)) / 4L;
            this.nakTimerInterval = this.ackTimerInterval = 4L * this.roundTripTime + this.roundTripTimeVar + Util.getSYNTime();
            this.statistics.setRTT(this.roundTripTime, this.roundTripTimeVar);
        }
    }

    protected void sendKeepAlive() throws IOException {
        KeepAlive ka = new KeepAlive();
        ka.setDestinationID(this.session.getDestination().getSocketID());
        ka.setSession(this.session);
        this.endpoint.doSend(ka);
    }

    protected void sendShutdown() throws IOException {
        Shutdown s = new Shutdown();
        s.setDestinationID(this.session.getDestination().getSocketID());
        s.setSession(this.session);
        this.endpoint.doSend(s);
    }

    protected void resetEXPTimer() {
        this.nextEXP = Util.getCurrentTime() + this.expTimerInterval;
        this.expCount = 0L;
    }

    protected void resetEXPCount() {
        this.expCount = 0L;
    }

    public void setAckInterval(long ackInterval) {
        this.ackInterval = ackInterval;
    }

    protected void onShutdown() throws IOException {
        this.session.getSocket().close();
        this.stop();
    }

    public void stop() throws IOException {
        this.stopped = true;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("UDTReceiver ").append(this.session).append("\n");
        sb.append("LossList: " + this.receiverLossList);
        return sb.toString();
    }
}

