/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.http;

import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.ParseException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorClientConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.http.HttpConnectable;
import org.nustaq.kontraktor.remoting.http.HttpObjectSocket;
import org.nustaq.kontraktor.remoting.websockets.WebObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.serialization.FSTConfiguration;

public class HttpClientConnector
implements ActorClientConnector {
    public static int MAX_CONN_TOTAL = 1000;
    public static int MAX_CONN_PER_ROUTE = 1000;
    public static boolean DumpProtocol = false;
    protected static CloseableHttpAsyncClient asyncHttpClient;
    String sessionId;
    FSTConfiguration authConf = FSTConfiguration.createJsonConfiguration();
    volatile boolean isClosed = false;
    Promise closedNotification;
    Callback<ActorClientConnector> disconnectCallback;
    HttpConnectable cfg;
    long currentShortPollIntervalMS;
    public Object[] authData;
    Runnable pollRunnable;
    static final Header NO_CACHE;
    static HttpClientActor singletonRec;
    static HttpClientActor singletonRefPoll;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static CloseableHttpAsyncClient getClient() {
        Class<HttpClientConnector> clazz = HttpClientConnector.class;
        synchronized (HttpClientConnector.class) {
            if (asyncHttpClient == null) {
                asyncHttpClient = HttpAsyncClients.custom().setMaxConnPerRoute(MAX_CONN_PER_ROUTE).setMaxConnTotal(MAX_CONN_TOTAL).setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).setSoKeepAlive(true).setSoReuseAddress(true).build()).build();
                asyncHttpClient.start();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return asyncHttpClient;
        }
    }

    public HttpClientConnector(HttpConnectable httpConnectable) {
        this.cfg = httpConnectable;
        this.currentShortPollIntervalMS = this.cfg.getShortPollIntervalMS();
    }

    public IPromise connect(Function<ObjectSocket, ObjectSink> factory) throws Exception {
        final Promise res = new Promise();
        byte[] message = this.authConf.asByteArray((Object)this.authData);
        if (DumpProtocol) {
            try {
                System.out.println("auth-req:" + new String(message, "UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        HttpPost req = new HttpPost(this.cfg.getUrl());
        req.addHeader(NO_CACHE);
        req.setEntity((HttpEntity)new ByteArrayEntity(message));
        Actor actor = Actor.current();
        HttpClientConnector.getClient().execute((HttpUriRequest)req, (FutureCallback)new FutureCallback<HttpResponse>((Executor)actor, factory){
            final /* synthetic */ Executor val$actor;
            final /* synthetic */ Function val$factory;
            {
                this.val$actor = executor;
                this.val$factory = function;
            }

            public void completed(HttpResponse result) {
                if (result.getStatusLine().getStatusCode() != 200) {
                    HttpClientConnector.this.closeClient();
                    res.reject((Object)result.getStatusLine().getStatusCode());
                    return;
                }
                String cl = result.getFirstHeader("Content-Length").getValue();
                int len = Integer.parseInt(cl);
                if (len > 0) {
                    byte[] resp = new byte[len];
                    try {
                        result.getEntity().getContent().read(resp);
                        if (DumpProtocol) {
                            try {
                                System.out.println("auth-resp:" + new String(resp, "UTF-8"));
                            }
                            catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        this.val$actor.execute(() -> {
                            HttpClientConnector.this.sessionId = (String)HttpClientConnector.this.authConf.asObject(resp);
                            MyHttpWS myHttpWS = new MyHttpWS(HttpClientConnector.this.cfg.getUrl() + "/" + HttpClientConnector.this.sessionId);
                            ObjectSink sink = (ObjectSink)this.val$factory.apply(myHttpWS);
                            myHttpWS.setSink(sink);
                            HttpClientConnector.this.startLongPoll(myHttpWS);
                            res.resolve();
                        });
                    }
                    catch (Exception e) {
                        Log.Warn((Object)this, (Throwable)e);
                        res.reject((Object)e);
                    }
                } else {
                    res.reject((Object)"connection rejected, no connection id");
                }
            }

            public void failed(Exception ex) {
                res.reject((Object)ex);
            }

            public void cancelled() {
                res.reject((Object)"canceled");
            }
        });
        return res;
    }

    protected void startLongPoll(MyHttpWS myHttpWS) {
        if (this.cfg.noPoll) {
            return;
        }
        this.currentShortPollIntervalMS = this.cfg.shortPollIntervalMS;
        this.pollRunnable = () -> {
            if (this.cfg.shortPollMode) {
                HttpClientConnector.getRefPollActor().delayed(this.currentShortPollIntervalMS, () -> {
                    if (this.isClosed) {
                        if (this.closedNotification != null) {
                            this.closedNotification.complete();
                            this.closedNotification = null;
                        }
                        return;
                    }
                    try {
                        myHttpWS.writeObject("SP");
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    Actor.current().delayed(this.currentShortPollIntervalMS, this.pollRunnable);
                    if (this.currentShortPollIntervalMS < this.cfg.shortPollIntervalMS) {
                        this.currentShortPollIntervalMS *= 2L;
                        this.currentShortPollIntervalMS = Math.min(this.cfg.shortPollIntervalMS, this.currentShortPollIntervalMS);
                    }
                });
            } else {
                myHttpWS.longPoll().then((Callback & Serializable)(r, e) -> {
                    if (this.isClosed) {
                        if (this.closedNotification != null) {
                            this.closedNotification.complete();
                            this.closedNotification = null;
                        }
                        return;
                    }
                    if (e == null) {
                        Actor.current().execute(this.pollRunnable);
                    } else {
                        Actor.current().delayed(1000L, this.pollRunnable);
                    }
                });
            }
        };
        if (!this.cfg.shortPollMode) {
            HttpClientConnector.getReceiveActor().getCurrentDispatcher().setName("Http LP dispatcher");
            HttpClientConnector.getReceiveActor().execute(this.pollRunnable);
        } else {
            HttpClientConnector.getRefPollActor().execute(this.pollRunnable);
        }
    }

    public IPromise closeClient() {
        this.closedNotification = new Promise();
        this.isClosed = true;
        if (this.disconnectCallback != null) {
            this.disconnectCallback.complete((Object)this, null);
        }
        Log.Info((Object)this, (String)"connection closing");
        return this.closedNotification;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static HttpClientActor getReceiveActor() {
        Class<HttpClientConnector> clazz = HttpClientConnector.class;
        synchronized (HttpClientConnector.class) {
            if (singletonRec == null) {
                singletonRec = (HttpClientActor)Actors.AsActor(HttpClientActor.class);
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return singletonRec;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static HttpClientActor getRefPollActor() {
        Class<HttpClientConnector> clazz = HttpClientConnector.class;
        synchronized (HttpClientConnector.class) {
            if (singletonRefPoll == null) {
                singletonRefPoll = (HttpClientActor)Actors.AsActor(HttpClientActor.class);
                singletonRefPoll.init();
            }
            // ** MonitorExit[var0] (shouldn't be in output)
            return singletonRefPoll;
        }
    }

    static {
        NO_CACHE = new Header(){

            public String getName() {
                return "Cache-Control";
            }

            public String getValue() {
                return "no-cache";
            }

            public HeaderElement[] getElements() throws ParseException {
                return new HeaderElement[0];
            }
        };
        singletonRec = (HttpClientActor)Actors.AsActor(HttpClientActor.class);
        singletonRefPoll = (HttpClientActor)Actors.AsActor(HttpClientActor.class);
    }

    public static class HttpClientActor
    extends Actor<HttpClientActor> {
        public void init() {
            Thread.currentThread().setName("HttpClient RefPolling");
        }
    }

    class MyHttpWS
    extends WebObjectSocket {
        String url;
        ObjectSink sink;
        int lastReceivedSequence = 0;
        CloseableHttpAsyncClient lpHttpClient;
        AtomicInteger openRequests = new AtomicInteger(0);

        public MyHttpWS(String url) {
            this.url = url;
            if (!HttpClientConnector.this.cfg.shortPollMode) {
                this.lpHttpClient = HttpClientConnector.getClient();
            }
        }

        @Override
        public void sendBinary(byte[] message) {
            if (DumpProtocol) {
                try {
                    System.out.println("req:" + new String(message, "UTF-8"));
                }
                catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            this.openRequests.incrementAndGet();
            HttpPost req = new HttpPost(this.url + "/" + this.lastReceivedSequence);
            req.addHeader(NO_CACHE);
            req.setEntity((HttpEntity)new ByteArrayEntity(message));
            HttpClientConnector.getClient().execute((HttpUriRequest)req, (FutureCallback)new FutureCallback<HttpResponse>(){

                public void completed(HttpResponse result) {
                    MyHttpWS.this.openRequests.decrementAndGet();
                    Runnable processLPResponse = MyHttpWS.this.getProcessLPRunnable(new Promise(), result);
                    HttpClientConnector.getReceiveActor().execute(processLPResponse);
                    HttpClientConnector.getRefPollActor().execute(() -> {
                        try {
                            MyHttpWS.this.flush();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
                }

                public void failed(Exception ex) {
                    ex.printStackTrace();
                    MyHttpWS.this.openRequests.decrementAndGet();
                }

                public void cancelled() {
                    Log.Warn((Object)this, (String)"request cancelled");
                    MyHttpWS.this.openRequests.decrementAndGet();
                }
            });
        }

        IPromise longPollSend(byte[] message) {
            final Promise p = new Promise();
            int seq = this.lastReceivedSequence;
            final HttpPost req = new HttpPost(this.url + "/" + seq);
            req.addHeader(NO_CACHE);
            req.setEntity((HttpEntity)new ByteArrayEntity(message));
            if (DumpProtocol) {
                try {
                    System.out.println("req:" + new String(message, "UTF-8"));
                }
                catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            final AtomicInteger timedout = new AtomicInteger(0);
            FutureCallback<HttpResponse> callback = this.getHttpLPFutureCallback(p, timedout);
            Actor.delayedCalls.schedule(new TimerTask(){

                @Override
                public void run() {
                    if (timedout.compareAndSet(0, 2)) {
                        AtomicInteger timedout2 = new AtomicInteger(0);
                        MyHttpWS.this.lpHttpClient.execute((HttpUriRequest)req, MyHttpWS.this.getHttpLPFutureCallback(p, timedout2));
                    }
                }
            }, HttpObjectSocket.LP_TIMEOUT + 1000);
            this.lpHttpClient.execute((HttpUriRequest)req, callback);
            return p;
        }

        private FutureCallback<HttpResponse> getHttpLPFutureCallback(final Promise p, final AtomicInteger timedout) {
            return new FutureCallback<HttpResponse>(){

                public void completed(HttpResponse result) {
                    if (!timedout.compareAndSet(0, 1)) {
                        return;
                    }
                    Runnable processLPRespponse = MyHttpWS.this.getProcessLPRunnable(p, result);
                    HttpClientConnector.getReceiveActor().execute(processLPRespponse);
                }

                public void failed(Exception ex) {
                    if (!timedout.compareAndSet(0, 1)) {
                        return;
                    }
                    ex.printStackTrace();
                    p.reject((Object)ex);
                }

                public void cancelled() {
                    if (!timedout.compareAndSet(0, 1)) {
                        return;
                    }
                    System.out.println("cancel");
                    p.reject((Object)"Canceled");
                }
            };
        }

        protected Runnable getProcessLPRunnable(Promise p, HttpResponse result) {
            return () -> {
                if (result.getStatusLine().getStatusCode() == 404) {
                    HttpClientConnector.this.closeClient();
                    p.reject((Object)"Closed");
                    return;
                }
                String cl = result.getFirstHeader("Content-Length").getValue();
                int len = Integer.parseInt(cl);
                if (len > 0) {
                    byte[] b = new byte[len];
                    try {
                        result.getEntity().getContent().read(b);
                        if (DumpProtocol) {
                            try {
                                System.out.println("resp:" + new String(b, "UTF-8"));
                            }
                            catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        Object o = this.getConf().asObject(b);
                        boolean send = true;
                        if (o instanceof Object[]) {
                            Object[] ar = (Object[])o;
                            int sequence = ((Number)ar[ar.length - 1]).intValue();
                            if (this.lastReceivedSequence > 0) {
                                boolean bl = send = this.lastReceivedSequence == sequence - 1;
                            }
                            if (send) {
                                this.lastReceivedSequence = sequence;
                            }
                        }
                        if (send) {
                            this.sink.receiveObject(o, null);
                        }
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                } else if (DumpProtocol) {
                    System.out.println("resp:<EMPTY>");
                }
                p.resolve();
            };
        }

        @Override
        public void writeObject(Object toWrite) throws Exception {
            if (!"SP".equals(toWrite)) {
                HttpClientConnector.this.currentShortPollIntervalMS = 200L;
                HttpClientConnector.getRefPollActor().delayed(100L, () -> {
                    try {
                        this.writeObject("SP");
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            this.objects.add(toWrite);
            if (this.objects.size() > this.getObjectMaxBatchSize()) {
                this.flush();
            }
        }

        public boolean canWrite() {
            return this.openRequests.get() == 0 || this.objects.size() < this.getObjectMaxBatchSize();
        }

        @Override
        public void flush() throws Exception {
            if (this.openRequests.get() == 0) {
                super.flush();
            }
        }

        @Override
        protected int getObjectMaxBatchSize() {
            return HttpObjectSocket.HTTP_BATCH_SIZE;
        }

        public IPromise longPoll() {
            int seq = this.sendSequence.incrementAndGet();
            Object[] objArr = new Object[]{seq};
            return this.longPollSend(this.conf.asByteArray((Object)objArr));
        }

        public void close() throws IOException {
            HttpClientConnector.this.closeClient();
        }

        public void setSink(ObjectSink sink) {
            this.sink = sink;
        }

        public ObjectSink getSink() {
            return this.sink;
        }
    }
}

