/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.http;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.messagestore.HeapMessageStore;
import org.nustaq.kontraktor.remoting.base.messagestore.MessageStore;
import org.nustaq.kontraktor.remoting.http.KHttpExchange;
import org.nustaq.kontraktor.remoting.websockets.WebObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.nustaq.offheap.BinaryQueue;
import org.nustaq.offheap.bytez.ByteSource;
import org.nustaq.offheap.bytez.onheap.HeapBytez;
import org.nustaq.serialization.util.FSTUtil;

public class HttpObjectSocket
extends WebObjectSocket
implements ObjectSink {
    private static final boolean LP_DEBUG = false;
    static AtomicInteger idCount = new AtomicInteger(0);
    int id = idCount.incrementAndGet();
    public static int LP_TIMEOUT = 15000;
    public static int HISTORY_SIZE = 3;
    public static int HTTP_BATCH_SIZE = 500;
    final Runnable closeAction;
    long lastUse;
    long creation = this.lastUse = System.currentTimeMillis();
    String sessionId;
    BinaryQueue queue = new BinaryQueue(4096);
    ObjectSink sink;
    MessageStore store = new HeapMessageStore(HISTORY_SIZE);
    volatile Pair<Runnable, KHttpExchange> longPollTask;
    Thread myThread;
    volatile long longPollTaskTime;
    AtomicInteger triggerPending = new AtomicInteger(0);

    public HttpObjectSocket(String sessionId, Runnable closeAction) {
        this.sessionId = sessionId;
        this.closeAction = closeAction;
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public void updateTimeStamp() {
        this.lastUse = System.currentTimeMillis();
    }

    public long getLastUse() {
        return this.lastUse;
    }

    @Override
    public void sendBinary(byte[] message) {
        this.checkThread();
        this.queue.addInt(this.sendSequence.get());
        this.queue.addInt(message.length);
        this.queue.add((ByteSource)new HeapBytez(message));
        this.triggerLongPoll();
    }

    @Override
    public void writeObject(Object toWrite) throws Exception {
        this.checkThread();
        super.writeObject(toWrite);
    }

    public void close() throws IOException {
        if (this.closeAction != null) {
            this.closeAction.run();
        }
    }

    public int getId() {
        return this.id;
    }

    public void setSink(ObjectSink sink) {
        this.sink = sink;
    }

    public ObjectSink getSink() {
        return this;
    }

    public Pair<byte[], Integer> getNextQueuedMessage() {
        this.checkThread();
        try {
            this.flush();
        }
        catch (Exception e) {
            FSTUtil.rethrow((Throwable)e);
        }
        if (this.queue.available() > 8L) {
            int seq = this.queue.readInt();
            int len = this.queue.readInt();
            if (len > 0 && this.queue.available() >= (long)len) {
                return new Pair((Object)this.queue.readByteArray(len), (Object)seq);
            }
            return new Pair((Object)new byte[0], (Object)0);
        }
        return new Pair((Object)new byte[0], (Object)0);
    }

    protected void checkThread() {
        if (this.myThread == null) {
            this.myThread = Thread.currentThread();
        } else if (this.myThread != Thread.currentThread()) {
            Log.Error((Object)this, (String)("unexpected multithreading detected:" + this.myThread.getName() + " curr:" + Thread.currentThread().getName()));
            Thread.dumpStack();
        }
    }

    public Pair<Runnable, KHttpExchange> getLongPollTask() {
        return this.longPollTask;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelLongPoll() {
        HttpObjectSocket httpObjectSocket = this;
        synchronized (httpObjectSocket) {
            if (this.longPollTask != null) {
                ((KHttpExchange)this.longPollTask.cdr()).endExchange();
                this.longPollTask = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void triggerLongPoll() {
        HttpObjectSocket httpObjectSocket = this;
        synchronized (httpObjectSocket) {
            if (this.longPollTask != null) {
                if (this.triggerPending.get() > 0) {
                    this.triggerPending.decrementAndGet();
                }
                Runnable car = (Runnable)this.longPollTask.car();
                this.longPollTask = null;
                car.run();
            } else {
                this.triggerPending.incrementAndGet();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setLongPollTask(Pair<Runnable, KHttpExchange> longPollTask) {
        HttpObjectSocket httpObjectSocket = this;
        synchronized (httpObjectSocket) {
            this.longPollTask = longPollTask;
            this.longPollTaskTime = System.currentTimeMillis();
            if (this.triggerPending.get() > 0) {
                this.triggerLongPoll();
            }
        }
    }

    public long getLongPollTaskTime() {
        return this.longPollTaskTime;
    }

    public void receiveObject(ObjectSink asink, Object received, List<IPromise> createdFutures, Object securityContext) {
        this.sink.receiveObject(asink, received, createdFutures, securityContext);
    }

    public void sinkClosed() {
        this.sink.sinkClosed();
    }

    public Object takeStoredLPMessage(int seq) {
        return this.store.getMessage((CharSequence)"sen", (long)seq);
    }

    public void storeLPMessage(int inSequence, Object msg) {
        this.store.putMessage((CharSequence)"sen", (long)inSequence, msg);
    }

    @Override
    public void flush() throws Exception {
        if (this.objects.size() == 0) {
            return;
        }
        this.objects.add(this.sendSequence.incrementAndGet());
        Object[] objArr = this.objects.toArray();
        this.objects.clear();
        this.sendBinary(this.conf.asByteArray((Object)objArr));
    }
}

