/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.core.sources;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataService;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceImpl;
import org.wso2.carbon.analytics.datasource.commons.AnalyticsSchema;
import org.wso2.carbon.analytics.datasource.commons.Record;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsTableNotAvailableException;
import org.wso2.carbon.analytics.datasource.core.util.GenericUtils;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsCommonUtils;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

public class AnalyticsWritingFunction
extends AbstractFunction1<Iterator<Row>, BoxedUnit>
implements Serializable {
    private static final long serialVersionUID = -1919222653470217466L;
    private static final Log log = LogFactory.getLog(AnalyticsWritingFunction.class);
    private int tId;
    private String tName;
    private StructType sch;
    private boolean globalTenantAccess;
    private String schemaString;
    private String primaryKeys;
    private boolean mergeFlag;
    private String recordStore;

    public AnalyticsWritingFunction(int tId, String tName, StructType sch, boolean globalTenantAccess, String schemaString, String primaryKeys, boolean mergeFlag, String recordStore) {
        this.tId = tId;
        this.tName = tName;
        this.sch = sch;
        this.globalTenantAccess = globalTenantAccess;
        this.schemaString = schemaString;
        this.primaryKeys = primaryKeys;
        this.mergeFlag = mergeFlag;
        this.recordStore = recordStore;
    }

    private void handleAnalyticsTableSchemaInvalidation() {
        AnalyticsDataService ads = ServiceHolder.getAnalyticsDataService();
        if (ads instanceof AnalyticsDataServiceImpl) {
            int targetTenantId = this.globalTenantAccess ? -2000 : this.tId;
            ((AnalyticsDataServiceImpl)ads).invalidateAnalyticsTableInfo(targetTenantId, this.tName);
        }
    }

    public BoxedUnit apply(Iterator<Row> iterator) {
        ArrayList<Row> rows = new ArrayList<Row>(1000);
        this.handleAnalyticsTableSchemaInvalidation();
        while (iterator.hasNext()) {
            if (rows.size() >= 1000) continue;
            Row row = (Row)iterator.next();
            rows.add(row);
            if (rows.size() != 1000) continue;
            this.recordsPut(rows);
            rows.clear();
        }
        if (!rows.isEmpty()) {
            this.recordsPut(rows);
        }
        return BoxedUnit.UNIT;
    }

    private void recordsPut(List<Row> rows) {
        if (this.globalTenantAccess) {
            this.recordsPutGlobal(rows);
        } else {
            this.recordsPutNormal(rows);
        }
    }

    private void recordsPutNormal(List<Row> rows) {
        ArrayList<Record> records = new ArrayList<Record>(rows.size());
        for (Row row : rows) {
            records.add(this.convertRowAndSchemaToRecord(row, this.sch, false));
        }
        AnalyticsDataService ads = ServiceHolder.getAnalyticsDataService();
        try {
            ads.put(records);
        }
        catch (AnalyticsException e) {
            String msg = "Error while inserting data into table " + this.tName + ": " + e.getMessage();
            log.error((Object)msg, (Throwable)e);
            throw new RuntimeException(msg, e);
        }
    }

    private void recordsPutGlobal(List<Row> rows) {
        ArrayList<Record> records = new ArrayList<Record>(rows.size());
        for (Row row : rows) {
            records.add(this.convertRowAndSchemaToRecord(row, this.sch, true));
        }
        AnalyticsDataService ads = ServiceHolder.getAnalyticsDataService();
        try {
            for (List recordsBatch : GenericUtils.generateRecordBatches(records)) {
                try {
                    ads.put(recordsBatch);
                }
                catch (AnalyticsTableNotAvailableException e) {
                    Record firstRecord = (Record)recordsBatch.get(0);
                    this.createTargetTableAndSetSchema(ads, firstRecord.getTenantId(), firstRecord.getTableName());
                    ads.put(recordsBatch);
                }
            }
        }
        catch (AnalyticsException e) {
            String msg = "Error while inserting data into table " + this.tName + ": " + e.getMessage();
            log.error((Object)msg, (Throwable)e);
            throw new RuntimeException(msg, e);
        }
    }

    protected void createTargetTableAndSetSchema(AnalyticsDataService ads, int targetTenantId, String targetTableName) throws AnalyticsException {
        AnalyticsCommonUtils.createTableIfNotExists(ads, this.recordStore, targetTenantId, targetTableName);
        AnalyticsSchema schema = AnalyticsCommonUtils.createAnalyticsTableSchema(ads, targetTenantId, targetTableName, this.schemaString, this.primaryKeys, this.globalTenantAccess, this.mergeFlag, false);
        ads.setTableSchema(targetTenantId, targetTableName, schema);
    }

    private Record convertRowAndSchemaToRecord(Row row, StructType schema, boolean global) {
        String[] colNames = schema.fieldNames();
        HashMap<String, Object> result = new HashMap<String, Object>();
        long timestamp = -1L;
        int targetTenantId = this.tId;
        boolean globalTenantProcessed = false;
        for (int i = 0; i < row.length(); ++i) {
            if (colNames[i].equals("_timestamp")) {
                timestamp = row.getLong(i);
                continue;
            }
            if (global && colNames[i].equals("_tenantId")) {
                targetTenantId = row.getInt(i);
                globalTenantProcessed = true;
                continue;
            }
            result.put(colNames[i], row.get(i));
        }
        if (global && !globalTenantProcessed) {
            throw new RuntimeException("The field '_tenantId' is not found in row: " + row + " with schema: " + schema + " when creating a global tenant access record");
        }
        if (timestamp < 0L) {
            return new Record(targetTenantId, this.tName, result);
        }
        return new Record(targetTenantId, this.tName, result, timestamp);
    }
}

