/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.manager.runtime;

import com.google.common.base.Charsets;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.http.HttpEntity;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.config.backend.BackendConfig;
import org.streampipes.manager.runtime.SpDataFormatConverter;
import org.streampipes.manager.runtime.SpDataFormatConverterGenerator;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.jms.ActiveMQConsumer;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.TransportFormat;

public enum PipelineElementRuntimeInfoFetcher {
    INSTANCE;

    Logger logger = LoggerFactory.getLogger(JsonParser.class);
    private static Set<String> consumerInstances;
    private Map<String, SpDataFormatConverter> converterMap = new HashMap<String, SpDataFormatConverter>();
    private static final String CONSUMER_GROUP_ID = "streampipes-backend-listener-group-";
    private static final String KAFKA_REST_ACCEPT = "application/vnd.kafka.binary.v2+json";
    private static final String KAFKA_REST_CONTENT_TYPE = "application/vnd.kafka.v2+json";
    private static final String OFFSET_FIELD_NAME = "offset";
    private static final String VALUE_FIELD_NAME = "value";

    public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
        if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
            return this.getLatestEventFromKafka(spDataStream);
        }
        return this.getLatestEventFromJms(spDataStream);
    }

    private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
        final String[] result = new String[]{null};
        final String topic = this.getOutputTopic(spDataStream);
        if (!this.converterMap.containsKey(topic)) {
            this.converterMap.put(topic, new SpDataFormatConverterGenerator(this.getTransportFormat(spDataStream)).makeConverter());
        }
        final ActiveMQConsumer consumer = new ActiveMQConsumer();
        consumer.connect((JmsTransportProtocol)spDataStream.getEventGrounding().getTransportProtocol(), (InternalEventProcessor)new InternalEventProcessor<byte[]>(){

            public void onEvent(byte[] event) {
                try {
                    result[0] = ((SpDataFormatConverter)PipelineElementRuntimeInfoFetcher.this.converterMap.get(topic)).convert(event);
                    consumer.disconnect();
                }
                catch (SpRuntimeException e) {
                    e.printStackTrace();
                }
            }
        });
        while (result[0] == null) {
            try {
                Thread.sleep(300L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return result[0];
    }

    private TransportFormat getTransportFormat(SpDataStream spDataStream) {
        return (TransportFormat)spDataStream.getEventGrounding().getTransportFormats().get(0);
    }

    private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
        String kafkaRestUrl = this.getKafkaRestUrl();
        String kafkaTopic = this.getOutputTopic(spDataStream);
        return this.getLatestSubscription(kafkaRestUrl, kafkaTopic, spDataStream);
    }

    private String getLatestSubscription(String kafkaRestUrl, String kafkaTopic, SpDataStream spDataStream) throws SpRuntimeException {
        String kafkaRestRecordsUrl = this.getConsumerInstanceUrl(kafkaRestUrl, this.getConsumerInstanceId(kafkaTopic), kafkaTopic) + "/records";
        try {
            if (!consumerInstances.contains(this.getConsumerInstanceId(kafkaTopic)) || !this.converterMap.containsKey(kafkaTopic)) {
                this.createSubscription(kafkaRestUrl, kafkaTopic);
                consumerInstances.add(this.getConsumerInstanceId(kafkaTopic));
                this.converterMap.put(kafkaTopic, new SpDataFormatConverterGenerator(this.getTransportFormat(spDataStream)).makeConverter());
            }
            Response response = Request.Get((String)kafkaRestRecordsUrl).addHeader("Accept", KAFKA_REST_ACCEPT).execute();
            return this.extractPayload(response.returnContent().asString(), spDataStream);
        }
        catch (IOException | SpRuntimeException e) {
            if (!e.getMessage().equals("")) {
                this.logger.error("Could not get any sample data from Kafka", e);
            }
            consumerInstances.remove(this.getConsumerInstanceId(kafkaTopic));
            throw new SpRuntimeException(e.getMessage());
        }
    }

    private void createSubscription(String kafkaRestUrl, String kafkaTopic) throws IOException, SpRuntimeException {
        String consumerInstance = this.getConsumerInstanceId(kafkaTopic);
        Integer statusCode = this.createConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
        Integer subscriptionStatusCode = this.subscribeConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
        if (subscriptionStatusCode != 204) {
            throw new SpRuntimeException("Could not read message form Kafka-REST: " + kafkaRestUrl);
        }
    }

    private Integer subscribeConsumer(String kafkaRestUrl, String consumerInstance, String kafkaTopic) throws IOException {
        String subscribeConsumerUrl = this.getConsumerInstanceUrl(kafkaRestUrl, consumerInstance, kafkaTopic) + "/subscription";
        return Request.Post((String)subscribeConsumerUrl).addHeader("Content-Type", KAFKA_REST_CONTENT_TYPE).body((HttpEntity)new StringEntity(this.makeSubscribeConsumerBody(kafkaTopic), Charsets.UTF_8)).execute().returnResponse().getStatusLine().getStatusCode();
    }

    private String getConsumerInstanceUrl(String kafkaRestUrl, String consumerInstance, String topic) {
        return kafkaRestUrl + "/consumers/" + this.getConsumerGroupId(topic) + "/instances/" + consumerInstance;
    }

    private String getConsumerGroupId(String topic) {
        return CONSUMER_GROUP_ID + topic;
    }

    private String makeSubscribeConsumerBody(String kafkaTopic) {
        return "{\"topics\":[\"" + kafkaTopic + "\"]}";
    }

    private Integer createConsumer(String kafkaRestUrl, String consumerInstance, String topic) throws IOException {
        String createConsumerUrl = kafkaRestUrl + "/consumers/" + this.getConsumerGroupId(topic);
        return Request.Post((String)createConsumerUrl).addHeader("Content-Type", KAFKA_REST_CONTENT_TYPE).body((HttpEntity)new StringEntity(this.makeCreateConsumerBody(consumerInstance), Charsets.UTF_8)).execute().returnResponse().getStatusLine().getStatusCode();
    }

    private String makeCreateConsumerBody(String consumerInstance) {
        return "{\"name\": \"" + consumerInstance + "\", \"format\": \"binary\", \"auto.offset.reset\": \"latest\"}";
    }

    private String getConsumerInstanceId(String kafkaTopic) {
        return "streampipes-backend-listener-group--" + kafkaTopic;
    }

    private String getOutputTopic(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private String getKafkaRestUrl() {
        return BackendConfig.INSTANCE.getKafkaRestUrl();
    }

    private String extractPayload(String rawResponse, SpDataStream spDataStream) throws SpRuntimeException {
        JsonArray allElements;
        Long lastOffset = 0L;
        JsonElement jsonElement = new JsonParser().parse(rawResponse);
        if (jsonElement.isJsonArray() && (allElements = jsonElement.getAsJsonArray()).size() > 0) {
            JsonObject lastItem = allElements.get(0).getAsJsonObject();
            lastOffset = lastItem.get(OFFSET_FIELD_NAME).getAsLong();
            for (int i = 1; i < allElements.size(); ++i) {
                JsonObject obj = allElements.get(i).getAsJsonObject();
                Long offset = obj.get(OFFSET_FIELD_NAME).getAsLong();
                if (offset <= lastOffset) continue;
                lastItem = obj;
            }
            byte[] content = Base64.getDecoder().decode(lastItem.get(VALUE_FIELD_NAME).getAsString());
            return this.converterMap.get(this.getOutputTopic(spDataStream)).convert(content);
        }
        return "{}";
    }

    static {
        consumerInstances = new HashSet<String>();
    }
}

