/*
 * Decompiled with CFR 0.152.
 */
package org.swisspush.gateleen.player.log;

import com.google.common.base.Predicate;
import java.io.IOException;
import java.util.ArrayList;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.springframework.web.socket.sockjs.client.RestTemplateXhrTransport;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.swisspush.gateleen.player.exchange.Exchange;
import org.swisspush.gateleen.player.log.BufferingRequestLog;
import org.swisspush.gateleen.player.log.Collector;

public class EventBusCollector
extends AbstractWebSocketHandler
implements Collector {
    private String urlPrefix;
    private String sockPath;
    private String address;
    private BufferingRequestLog requestLog = new BufferingRequestLog();
    private Predicate<? super Exchange> filter;
    private SockJsClient client;
    private WebSocketSession session;
    private int nbMessages = 0;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public EventBusCollector(String urlPrefix, String sockPath, String address, Predicate<? super Exchange> filter) {
        this.urlPrefix = urlPrefix;
        this.sockPath = sockPath;
        this.address = address;
        this.filter = filter;
    }

    @Override
    public BufferingRequestLog getRequestLog() {
        return this.requestLog;
    }

    @Override
    public void start() {
        if (this.client == null) {
            ArrayList<Object> transports = new ArrayList<Object>(2);
            transports.add(new WebSocketTransport((WebSocketClient)new StandardWebSocketClient()));
            transports.add(new RestTemplateXhrTransport());
            this.client = new SockJsClient(transports);
            this.client.doHandshake((WebSocketHandler)this, this.urlPrefix + this.sockPath, new Object[0]);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            new Thread(() -> {
                while (this.client != null) {
                    try {
                        Thread.sleep(4000L);
                        this.ping();
                    }
                    catch (Exception e) {
                        this.log.error("Exception while pinging", (Throwable)e);
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }
    }

    private void ping() throws IOException {
        if (this.session != null && this.session.isOpen()) {
            this.session.sendMessage((WebSocketMessage)new TextMessage((CharSequence)"{\"type\":\"ping\"}"));
        } else {
            this.log.debug("Cannot ping, websocket session not yet open");
        }
    }

    @Override
    public void stop() {
        this.log.debug("Stopping. Got {} messages", (Object)this.nbMessages);
        if (this.client != null) {
            this.client.stop();
        }
        this.client = null;
    }

    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        ++this.nbMessages;
        String line = new JSONObject((String)message.getPayload()).getString("body");
        this.requestLog.add(new Exchange(this.urlPrefix, new JSONObject(line)));
    }

    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        this.log.debug("Web socket session established");
        this.session = session;
        session.sendMessage((WebSocketMessage)new TextMessage((CharSequence)("{\"type\":\"register\",\"address\":\"" + this.address + "\"}")));
    }

    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        this.log.debug("Websocket transport error", exception);
    }

    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        this.log.debug("Websocket session closed: {}", (Object)status.getReason());
    }
}

