/*
 * Decompiled with CFR 0.152.
 */
package org.webpieces.httpclient.impl;

import com.webpieces.util.acking.AckAggregator;
import com.webpieces.util.acking.ByteAckTracker;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.webpieces.data.api.DataWrapper;
import org.webpieces.data.api.DataWrapperGenerator;
import org.webpieces.data.api.DataWrapperGeneratorFactory;
import org.webpieces.httpclient.api.DataWriter;
import org.webpieces.httpclient.api.HttpDataWriter;
import org.webpieces.httpclient.api.HttpFullRequest;
import org.webpieces.httpclient.api.HttpFullResponse;
import org.webpieces.httpclient.api.HttpResponseListener;
import org.webpieces.httpclient.api.HttpSocket;
import org.webpieces.httpclient.impl.CatchResponseListener;
import org.webpieces.httpclient.impl.CompletableListener;
import org.webpieces.httpclient.impl.HttpChunkWriterImpl;
import org.webpieces.httpparser.api.HttpParser;
import org.webpieces.httpparser.api.MarshalState;
import org.webpieces.httpparser.api.Memento;
import org.webpieces.httpparser.api.dto.HttpData;
import org.webpieces.httpparser.api.dto.HttpPayload;
import org.webpieces.httpparser.api.dto.HttpRequest;
import org.webpieces.httpparser.api.dto.HttpResponse;
import org.webpieces.nio.api.channels.Channel;
import org.webpieces.nio.api.channels.TCPChannel;
import org.webpieces.nio.api.handlers.DataListener;
import org.webpieces.nio.api.handlers.RecordingDataListener;
import org.webpieces.util.logging.Logger;
import org.webpieces.util.logging.LoggerFactory;

public class HttpSocketImpl
implements HttpSocket {
    private static final Logger log = LoggerFactory.getLogger(HttpSocketImpl.class);
    private static DataWrapperGenerator wrapperGen = DataWrapperGeneratorFactory.createDataWrapperGenerator();
    private TCPChannel channel;
    private boolean isClosed;
    private boolean connected;
    private HttpParser parser;
    private Memento memento;
    private ConcurrentLinkedQueue<HttpResponseListener> responsesToComplete = new ConcurrentLinkedQueue();
    private DataListener dataListener = new MyDataListener();
    private boolean isRecording = false;
    private MarshalState state;

    public HttpSocketImpl(TCPChannel channel, HttpParser parser) {
        this.channel = channel;
        this.parser = parser;
        this.memento = parser.prepareToParse();
        this.state = parser.prepareToMarshal();
    }

    @Override
    public CompletableFuture<Void> connect(InetSocketAddress addr) {
        if (this.isRecording) {
            this.dataListener = new RecordingDataListener("httpSock-", this.dataListener);
        }
        return this.channel.connect((SocketAddress)addr, this.dataListener).thenApply(channel -> this.connected());
    }

    @Override
    public CompletableFuture<HttpFullResponse> send(HttpFullRequest request) {
        CompletableFuture<HttpFullResponse> future = new CompletableFuture<HttpFullResponse>();
        CompletableListener l = new CompletableListener(future);
        HttpData data = request.getData();
        this.send(request.getRequest(), l).thenCompose(w -> w.send(data));
        return future;
    }

    private Void connected() {
        this.connected = true;
        return null;
    }

    @Override
    public CompletableFuture<HttpDataWriter> send(HttpRequest request, HttpResponseListener listener) {
        if (!this.connected) {
            throw new IllegalStateException("The socket is not yet connected");
        }
        return this.actuallySendRequest(request, listener);
    }

    private CompletableFuture<HttpDataWriter> actuallySendRequest(HttpRequest request, HttpResponseListener listener) {
        CatchResponseListener l = new CatchResponseListener(listener);
        ByteBuffer wrap = this.parser.marshalToByteBuffer(this.state, (HttpPayload)request);
        this.responsesToComplete.offer(l);
        return this.channel.write(wrap).thenApply(v -> new HttpChunkWriterImpl(this.channel, this.parser, this.state));
    }

    @Override
    public CompletableFuture<Void> close() {
        if (this.isClosed) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture future = this.channel.close();
        return future.thenApply(chan -> {
            this.isClosed = true;
            return null;
        });
    }

    public String toString() {
        return "HttpSocketImpl [channel=" + this.channel + "]";
    }

    private class MyDataListener
    implements DataListener {
        private ByteAckTracker tracker = new ByteAckTracker();
        private CompletableFuture<DataWriter> future;

        private MyDataListener() {
        }

        public CompletableFuture<Void> incomingData(Channel channel, ByteBuffer b) {
            DataWrapper wrapper = wrapperGen.wrapByteBuffer(b);
            int bytesIn = b.remaining();
            int totalBytes = HttpSocketImpl.this.memento.getLeftOverData().getReadableSize() + b.remaining();
            HttpSocketImpl.this.memento = HttpSocketImpl.this.parser.parse(HttpSocketImpl.this.memento, wrapper);
            int totalBytesParsed = totalBytes - HttpSocketImpl.this.memento.getLeftOverData().getReadableSize();
            List parsedMessages = HttpSocketImpl.this.memento.getParsedMessages();
            AckAggregator ack = this.tracker.createTracker(bytesIn, parsedMessages.size(), totalBytesParsed);
            for (HttpPayload msg : parsedMessages) {
                if (msg instanceof HttpData) {
                    HttpData data = (HttpData)msg;
                    if (data.isEndOfData()) {
                        HttpSocketImpl.this.responsesToComplete.poll();
                    }
                    this.future.thenCompose(w -> w.incomingData(data).handle((v, t) -> (Void)ack.ack(v, t)));
                    continue;
                }
                if (msg instanceof HttpResponse) {
                    this.future = this.processResponse((HttpResponse)msg).handle((w, t) -> (DataWriter)ack.ack(w, t));
                    continue;
                }
                throw new IllegalStateException("invalid payload received=" + msg);
            }
            return ack.getAckBytePayloadFuture();
        }

        private CompletableFuture<DataWriter> processResponse(HttpResponse msg) {
            if (msg.isHasChunkedTransferHeader() || msg.isHasNonZeroContentLength()) {
                HttpResponse resp = msg;
                HttpResponseListener listener = (HttpResponseListener)HttpSocketImpl.this.responsesToComplete.peek();
                return listener.incomingResponse(resp, false);
            }
            HttpResponse resp = msg;
            HttpResponseListener listener = (HttpResponseListener)HttpSocketImpl.this.responsesToComplete.poll();
            return listener.incomingResponse(resp, true);
        }

        public void farEndClosed(Channel channel) {
            log.info("far end closed");
            HttpSocketImpl.this.isClosed = true;
        }

        public void failure(Channel channel, ByteBuffer data, Exception e) {
            log.error("Failure on channel=" + channel, (Throwable)e);
            while (!HttpSocketImpl.this.responsesToComplete.isEmpty()) {
                HttpResponseListener listener = (HttpResponseListener)HttpSocketImpl.this.responsesToComplete.poll();
                if (listener == null) continue;
                listener.failure(e);
            }
        }
    }
}

