/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.straw;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;

public class Straw {
    public static final int SECOND = 1000;
    public static final String OFFSET_BEGIN = "begin";
    private final ExecutorService _executor = Executors.newSingleThreadExecutor();
    private final URL _url;
    private final Map<String, String> _cursors;
    private final boolean _allPartitions;

    public Straw(URL url, Map<String, String> cursors) {
        this._url = url;
        this._cursors = new HashMap<String, String>(cursors);
        this._allPartitions = cursors.isEmpty();
    }

    public void start() {
        this._executor.submit(() -> {
            while (true) {
                this.fetchStream();
            }
        });
    }

    protected String loadToken() throws Exception {
        return System.getenv("TOKEN");
    }

    protected void handleEvents(String json) throws Exception {
        this.logDebug("handleEvents: " + json);
    }

    protected void logDebug(String message) {
        System.out.println("DEBUG: " + message);
    }

    protected void logInfo(String message) {
        System.out.println("INFO: " + message);
    }

    protected void logError(Exception e) {
        e.printStackTrace(System.err);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchStream() {
        this.logInfo("fetchStream: " + (this._allPartitions ? "all partitions (END)" : this.cursorString()));
        try (SSLSocket socket = (SSLSocket)SSLSocketFactory.getDefault().createSocket(this._url.getHost(), this.port());){
            String line;
            socket.setSoTimeout(60000);
            this.sendRequest(socket);
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
            Straw.skipHeaders(in);
            StringBuilder batch = new StringBuilder();
            boolean isData = false;
            while ((line = in.readLine()) != null) {
                if (isData) {
                    batch.append(line);
                    isData = false;
                    continue;
                }
                if (line.isEmpty()) {
                    this.handleBatch(batch.toString());
                    batch.setLength(0);
                    continue;
                }
                isData = true;
            }
        }
        catch (Exception e) {
            this.logError(e);
            Straw.tryToSleep(2000);
        }
    }

    private void handleBatch(String batch) throws Exception {
        if (!batch.isEmpty()) {
            Cursor cursor = Cursor.extract(batch);
            this.handleEvents(batch);
            if (Straw.offsetGreaterThan(cursor.offset, this._cursors.getOrDefault(cursor.partition, OFFSET_BEGIN))) {
                this._cursors.put(cursor.partition, cursor.offset);
            }
        }
    }

    private void sendRequest(SSLSocket socket) throws Exception {
        String token;
        socket.startHandshake();
        PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
        out.println("GET " + this.requestPath() + " HTTP/1.1");
        out.println("Host: " + this._url.getHost());
        out.println("User-Agent: straw");
        if (!this._allPartitions) {
            out.println("X-Nakadi-Cursors: " + this.cursorString());
        }
        if ((token = this.loadToken()) != null) {
            out.println("Authorization: Bearer " + token.trim());
        }
        out.println();
        out.flush();
    }

    private int port() {
        return this._url.getPort() == -1 ? 443 : this._url.getPort();
    }

    private String requestPath() {
        return this._url.getQuery() == null ? this._url.getPath() : this._url.getPath() + "?" + this._url.getQuery();
    }

    private String cursorString() {
        ArrayList<Cursor> result = new ArrayList<Cursor>();
        for (String partition : this._cursors.keySet()) {
            result.add(new Cursor(partition, this._cursors.get(partition)));
        }
        return Arrays.toString(result.toArray());
    }

    private static void skipHeaders(BufferedReader out) throws IOException, HttpException {
        String line;
        String status = null;
        while ((line = out.readLine()) != null) {
            if (status == null) {
                status = line.split("\\s", 2)[1];
                if (status.startsWith("200")) continue;
                throw new HttpException(status);
            }
            if (!line.trim().isEmpty()) continue;
            break;
        }
    }

    private static boolean offsetGreaterThan(String a, String b) {
        return OFFSET_BEGIN.equals(b) ? true : a.compareTo(b) > 0;
    }

    private static void tryToSleep(int millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    static final class Cursor {
        final String partition;
        final String offset;

        static List<String> parse(String batch) throws Exception {
            String match;
            ArrayList<String> result = new ArrayList<String>();
            Scanner scanner = new Scanner(batch);
            while ((match = scanner.findInLine("\"\\w+\"")) != null) {
                match = match.substring(1, match.length() - 1);
                result.add(match);
            }
            return result;
        }

        static Cursor extract(String batch) throws Exception {
            List<String> parsed = Cursor.parse(batch);
            if ("cursor".equals(parsed.get(0))) {
                if ("partition".equals(parsed.get(1)) && "offset".equals(parsed.get(3))) {
                    return new Cursor(parsed.get(2), parsed.get(4));
                }
                if ("partition".equals(parsed.get(3)) && "offset".equals(parsed.get(1))) {
                    return new Cursor(parsed.get(4), parsed.get(2));
                }
                throw new Exception("malformed batch: " + batch);
            }
            throw new Exception("malformed batch: " + batch);
        }

        Cursor(String partition, String offset) {
            this.partition = partition;
            this.offset = offset;
        }

        public String toString() {
            return String.format("{\"partition\":\"%s\",\"offset\":\"%s\"}", this.partition, this.offset);
        }
    }

    public static final class HttpException
    extends Exception {
        public HttpException(String message) {
            super(message);
        }
    }
}

