/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.http.rest;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.nustaq.kontraktor.remoting.ObjectSocket;
import org.nustaq.kontraktor.remoting.RemoteCallEntry;
import org.nustaq.kontraktor.remoting.http.ArgTypesResolver;
import org.nustaq.kontraktor.remoting.http.rest.HttpRemotedCB;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kson.Kson;
import org.nustaq.kson.KsonCharInput;
import org.nustaq.kson.KsonDeserializer;
import org.nustaq.kson.KsonStringCharInput;

public class HttpObjectSocket
implements ObjectSocket {
    public static final int MAX_BATCHED_REQUESTS = 500;
    Class actorClz;
    int port = 9999;
    String host;
    String actorPath;
    Kson kson;
    ArgTypesResolver resolver;
    LinkedBlockingQueue<String> httpReqQueue;
    LinkedBlockingQueue<Object> httpRespQueue;

    public HttpObjectSocket(Class actorClz, int port, String host, String actorPath) {
        this.actorClz = actorClz;
        this.port = port;
        this.host = host;
        this.actorPath = actorPath;
        this.init();
    }

    protected void init() {
        this.kson = new Kson().map("call", RemoteCallEntry.class).map("calls", RemoteCallEntry[].class).map("rcb", HttpRemotedCB.class);
        this.kson.getMapper().setUseSimplClzName(false);
        this.resolver = new ArgTypesResolver(this.actorClz);
        this.httpReqQueue = new LinkedBlockingQueue();
        this.httpRespQueue = new LinkedBlockingQueue();
        new Thread(() -> {
            ArrayList<String> calls = new ArrayList<String>();
            StringBuilder sb = new StringBuilder(100);
            while (true) {
                try {
                    while (true) {
                        int ch;
                        calls.add(this.httpReqQueue.take());
                        this.httpReqQueue.drainTo(calls, 500);
                        Socket socket = null;
                        while (calls.size() > 0) {
                            try {
                                socket = this.post(calls);
                                calls.clear();
                            }
                            catch (Exception ex) {
                                Thread.sleep(100L);
                            }
                        }
                        BufferedReader read = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF8"));
                        String head = read.readLine();
                        read.readLine();
                        sb.setLength(0);
                        while ((ch = read.read()) > 0) {
                            sb.append((char)ch);
                        }
                        String ksonString = sb.toString();
                        if (ksonString.length() > 0) {
                            int resp = 0;
                            KsonStringCharInput in = new KsonStringCharInput((CharSequence)ksonString);
                            KsonDeserializer deserializer = new KsonDeserializer((KsonCharInput)in, this.kson.getMapper());
                            while (in.position() < ksonString.length()) {
                                Object o = deserializer.readObject(RemoteCallEntry.class, String.class, null);
                                if (o != null) {
                                    this.httpRespQueue.put(o);
                                    ++resp;
                                }
                                deserializer.skipWS();
                            }
                        }
                        socket.close();
                    }
                }
                catch (Exception e) {
                    Log.Warn(this, e, "");
                    continue;
                }
                break;
            }
        }, "ObjectSocket:" + this.host + ":" + this.port + this.actorPath).start();
    }

    public Kson getKson() {
        return this.kson;
    }

    @Override
    public Object readObject() throws Exception {
        return this.httpRespQueue.take();
    }

    @Override
    public void writeObject(Object toWrite) throws Exception {
        String sendString = this.kson.writeObject(toWrite, false);
        this.httpReqQueue.put(sendString);
    }

    @Override
    public void flush() throws IOException {
    }

    private Socket post(ArrayList<String> requests) throws IOException {
        InetAddress addr = InetAddress.getByName(this.host);
        String post = "[\n";
        int min = requests.size();
        for (int i = 0; i < min; ++i) {
            post = post + requests.get(i);
        }
        post = post + "]";
        Socket socket = new Socket(addr, this.port);
        String path = this.actorPath;
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF8"));
        bw.write("POST " + path + " HTTP/1.0\n");
        bw.write("Content-Length: " + post.length() + "\n");
        bw.write("Accept: text/kson\n");
        bw.write("Content-Type: application/kson\n");
        bw.write("\n");
        bw.write(post);
        bw.flush();
        return socket;
    }

    @Override
    public void setLastError(Exception ex) {
    }

    @Override
    public void close() throws IOException {
    }
}

