/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.core.rdd;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.Dependency;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.wso2.carbon.analytics.dataservice.commons.AnalyticsDataResponse;
import org.wso2.carbon.analytics.datasource.commons.AnalyticsIterator;
import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.sources.AnalyticsPartition;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.reflect.ClassTag;

public class AnalyticsRDD
extends RDD<Row>
implements Serializable {
    private static final Log log = LogFactory.getLog(AnalyticsRDD.class);
    private static final long serialVersionUID = 5948588299500227997L;
    protected List<String> columns;
    private int tenantId;
    private String tableName;
    private long timeFrom;
    private long timeTo;
    private boolean incEnable;
    private String incID;

    public AnalyticsRDD() {
        super(null, null, null);
    }

    public AnalyticsRDD(int tenantId, String tableName, List<String> columns, SparkContext sc, Seq<Dependency<?>> deps, ClassTag<Row> evidence, long timeFrom, long timeTo, boolean incEnable, String incID) {
        super(sc, deps, evidence);
        this.tenantId = tenantId;
        this.tableName = tableName;
        this.columns = columns;
        this.timeFrom = timeFrom;
        this.timeTo = timeTo;
        this.incEnable = incEnable;
        this.incID = incID;
    }

    public Iterator<Row> compute(Partition split, TaskContext context) {
        AnalyticsPartition partition = (AnalyticsPartition)split;
        try {
            AnalyticsIterator recordsItr = ServiceHolder.getAnalyticsDataService().readRecords(partition.getRecordStoreName(), partition.getRecordGroup());
            return new InterruptibleIterator(context, JavaConversions.asScalaIterator(this.getRowRecordIteratorAdaptor((java.util.Iterator<Record>)recordsItr, this.tenantId, this.incEnable, this.incID)));
        }
        catch (AnalyticsException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    protected java.util.Iterator<Row> getRowRecordIteratorAdaptor(java.util.Iterator<Record> recordItr, int tenantId, boolean incEnable, String incID) {
        return new RowRecordIteratorAdaptor(recordItr, tenantId, incEnable, incID);
    }

    public Seq<String> getPreferredLocations(Partition split) {
        if (split instanceof AnalyticsPartition) {
            AnalyticsPartition ap = (AnalyticsPartition)split;
            try {
                return JavaConversions.asScalaBuffer(Arrays.asList(ap.getRecordGroup().getLocations())).toList();
            }
            catch (AnalyticsException e) {
                log.error((Object)("Error in getting preffered location: " + e.getMessage() + " falling back to default impl."), (Throwable)e);
                return super.getPreferredLocations(split);
            }
        }
        return super.getPreferredLocations(split);
    }

    public Partition[] getPartitions() {
        AnalyticsDataResponse resp;
        try {
            resp = ServiceHolder.getAnalyticsDataService().get(this.tenantId, this.tableName, this.computePartitions(), this.columns, this.timeFrom, this.timeTo, 0, -1);
        }
        catch (AnalyticsException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        List entries = resp.getEntries();
        Partition[] result = new Partition[entries.size()];
        for (int i = 0; i < entries.size(); ++i) {
            result[i] = new AnalyticsPartition(((AnalyticsDataResponse.Entry)entries.get(i)).getRecordStoreName(), ((AnalyticsDataResponse.Entry)entries.get(i)).getRecordGroup(), i);
        }
        return result;
    }

    private int computePartitions() throws AnalyticsException {
        if (ServiceHolder.getAnalyticskExecutor() != null) {
            return ServiceHolder.getAnalyticskExecutor().getNumPartitionsHint();
        }
        return 6;
    }

    private class RowRecordIteratorAdaptor
    implements java.util.Iterator<Row>,
    Serializable {
        private static final long serialVersionUID = -8866801517386445810L;
        private java.util.Iterator<Record> recordItr;
        private int tenantId;
        private boolean incEnable;
        private String incID;
        private long incMaxTS = Long.MIN_VALUE;

        public RowRecordIteratorAdaptor(java.util.Iterator<Record> recordItr, int tenantId, boolean incEnable, String incID) {
            this.recordItr = recordItr;
            this.tenantId = tenantId;
            this.incEnable = incEnable;
            this.incID = incID;
        }

        @Override
        public boolean hasNext() {
            boolean hasNext = this.recordItr.hasNext();
            if (!hasNext && this.incEnable) {
                this.updateIncProcessingTS();
            }
            return hasNext;
        }

        private void updateIncProcessingTS() {
            try {
                long existingIncTS = ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(this.tenantId, this.incID, false);
                if (existingIncTS < this.incMaxTS) {
                    ServiceHolder.getIncrementalMetaStore().setLastProcessedTimestamp(this.tenantId, this.incID, this.incMaxTS, false);
                }
            }
            catch (AnalyticsException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public Row next() {
            Record record = this.recordItr.next();
            if (this.incEnable && record.getTimestamp() > this.incMaxTS) {
                this.incMaxTS = record.getTimestamp();
            }
            return this.recordToRow(record);
        }

        private Row recordToRow(Record record) {
            if (record == null) {
                return null;
            }
            Map recordVals = record.getValues();
            Object[] rowVals = new Object[AnalyticsRDD.this.columns.size()];
            for (int i = 0; i < AnalyticsRDD.this.columns.size(); ++i) {
                rowVals[i] = AnalyticsRDD.this.columns.get(i).equals("_timestamp") ? (Number)record.getTimestamp() : (Number)(AnalyticsRDD.this.columns.get(i).equals("_tenantId") ? Integer.valueOf(record.getTenantId()) : recordVals.get(AnalyticsRDD.this.columns.get(i)));
            }
            return RowFactory.create((Object[])rowVals);
        }

        @Override
        public void remove() {
            this.recordItr.remove();
        }
    }
}

