/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.core.rdd;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
import org.apache.spark.Dependency;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.rdd.AnalyticsRDD;
import org.wso2.carbon.analytics.spark.core.util.CompressedEventAnalyticsUtils;
import org.wso2.carbon.analytics.spark.core.util.PublishingPayload;
import scala.collection.Seq;
import scala.reflect.ClassTag;

public class CompressedEventAnalyticsRDD
extends AnalyticsRDD
implements Serializable {
    private static final long serialVersionUID = 5948588299500227997L;
    private List<String> outputColumns;

    public CompressedEventAnalyticsRDD() {
    }

    public CompressedEventAnalyticsRDD(int tenantId, String tableName, List<String> columns, SparkContext sc, Seq<Dependency<?>> deps, ClassTag<Row> evidence, long timeFrom, long timeTo, boolean incEnable, String incID) {
        super(tenantId, tableName, columns, sc, deps, evidence, timeFrom, timeTo, incEnable, incID);
        this.outputColumns = columns;
        this.columns = this.getAllColumns(columns);
    }

    @Override
    protected Iterator<Row> getRowRecordIteratorAdaptor(Iterator<Record> recordItr, int tenantId, boolean incEnable, String incID) {
        return new CompressedEventRowRecordIteratorAdaptor(recordItr, tenantId, incEnable, incID, this.outputColumns);
    }

    private List<String> getAllColumns(List<String> columns) {
        ArrayList<String> allColumns = new ArrayList<String>(columns);
        if (!allColumns.contains("flowData")) {
            allColumns.add("flowData");
        }
        allColumns.add("meta_compressed");
        return allColumns;
    }

    private static class CompressedEventRowRecordIteratorAdaptor
    implements Iterator<Row>,
    Serializable {
        private static final long serialVersionUID = -8866801517386445810L;
        private Iterator<Record> recordItr;
        private Iterator<Row> rows;
        private int tenantId;
        private boolean incEnable;
        private String incID;
        private long incMaxTS = Long.MIN_VALUE;
        private int timestampIndex;
        private Kryo kryo = new Kryo();
        private List<String> columns;

        public CompressedEventRowRecordIteratorAdaptor(Iterator<Record> recordItr, int tenantId, boolean incEnable, String incID, List<String> columns) {
            this.columns = columns;
            this.recordItr = recordItr;
            this.tenantId = tenantId;
            this.incEnable = incEnable;
            this.incID = incID;
            this.timestampIndex = columns.indexOf("_timestamp");
            this.kryo.register(HashMap.class, 111);
            this.kryo.register(ArrayList.class, 222);
            this.kryo.register(PublishingPayload.class, 333);
        }

        @Override
        public boolean hasNext() {
            boolean hasNext;
            if (this.rows == null && this.recordItr.hasNext()) {
                this.rows = this.recordToRows(this.recordItr.next());
            }
            if (this.rows == null) {
                hasNext = false;
            } else if (this.rows.hasNext()) {
                hasNext = true;
            } else {
                this.rows = null;
                hasNext = this.hasNext();
            }
            if (!hasNext && this.incEnable) {
                this.updateIncProcessingTS();
            }
            return hasNext;
        }

        private void updateIncProcessingTS() {
            try {
                long existingIncTS = ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(this.tenantId, this.incID, false);
                if (existingIncTS < this.incMaxTS) {
                    ServiceHolder.getIncrementalMetaStore().setLastProcessedTimestamp(this.tenantId, this.incID, this.incMaxTS, false);
                }
            }
            catch (AnalyticsException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }

        @Override
        public Row next() {
            Row row = this.hasNext() ? this.rows.next() : null;
            if (this.incEnable && row.getLong(this.timestampIndex) > this.incMaxTS) {
                this.incMaxTS = row.getLong(this.timestampIndex);
            }
            return row;
        }

        private Iterator<Row> recordToRows(Record record) {
            ArrayList<Row> tempRows = new ArrayList<Row>();
            Map recordVals = record.getValues();
            if (recordVals.get("flowData") != null) {
                String eventsString = recordVals.get("flowData").toString();
                ByteArrayInputStream unzippedByteArray = (Boolean)recordVals.get("meta_compressed") != false ? CompressedEventAnalyticsUtils.decompress(eventsString) : new ByteArrayInputStream(DatatypeConverter.parseBase64Binary((String)eventsString));
                Input input = new Input(unzippedByteArray);
                Map aggregatedEvent = this.kryo.readObject(input, HashMap.class);
                List eventsList = (List)aggregatedEvent.get("events");
                List payloadsList = (List)aggregatedEvent.get("payloads");
                int metaTenantId = 0;
                if (recordVals.containsKey("meta_tenantId")) {
                    metaTenantId = (Integer)recordVals.get("meta_tenantId");
                }
                String host = null;
                if (aggregatedEvent.containsKey("host")) {
                    host = aggregatedEvent.get("host").toString();
                }
                for (int i = 0; i < eventsList.size(); ++i) {
                    tempRows.add(RowFactory.create((Object[])CompressedEventAnalyticsUtils.getFieldValues(this.columns, (List)eventsList.get(i), payloadsList, i, record.getTimestamp(), record.getTenantId(), metaTenantId, host)));
                }
            } else {
                tempRows.add(RowFactory.create((Object[])Collections.emptyList().toArray()));
            }
            return tempRows.iterator();
        }

        @Override
        public void remove() {
            this.recordItr.remove();
        }
    }
}

