/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.carbon.analytics.spark.core.internal;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.deploy.master.LeaderElectable;
import org.apache.spark.deploy.master.Master;
import org.apache.spark.deploy.worker.Worker;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.jdbc.carbon.AnalyticsJDBCRelationProvider;
import org.apache.spark.util.Utils;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsDataServiceUtils;
import org.wso2.carbon.analytics.dataservice.core.AnalyticsServiceHolder;
import org.wso2.carbon.analytics.dataservice.core.clustering.AnalyticsClusterException;
import org.wso2.carbon.analytics.dataservice.core.clustering.AnalyticsClusterManager;
import org.wso2.carbon.analytics.dataservice.core.clustering.GroupEventListener;
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
import org.wso2.carbon.analytics.datasource.core.util.GenericUtils;
import org.wso2.carbon.analytics.spark.core.AnalyticsExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.AnalyticsRecoveryModeFactory;
import org.wso2.carbon.analytics.spark.core.deploy.CheckElectedLeaderExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.ElectLeaderExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.InitClientExecutionCall;
import org.wso2.carbon.analytics.spark.core.deploy.StartWorkerExecutionCall;
import org.wso2.carbon.analytics.spark.core.exception.AnalyticsExecutionException;
import org.wso2.carbon.analytics.spark.core.exception.AnalyticsUDFException;
import org.wso2.carbon.analytics.spark.core.internal.ServiceHolder;
import org.wso2.carbon.analytics.spark.core.sources.AnalyticsRelationProvider;
import org.wso2.carbon.analytics.spark.core.sources.CompressedEventAnalyticsRelationProvider;
import org.wso2.carbon.analytics.spark.core.udf.AnalyticsUDFsRegister;
import org.wso2.carbon.analytics.spark.core.udf.CarbonUDF;
import org.wso2.carbon.analytics.spark.core.udf.config.UDFConfiguration;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsCommonUtils;
import org.wso2.carbon.analytics.spark.core.util.AnalyticsQueryResult;
import org.wso2.carbon.analytics.spark.core.util.SparkTableNamesHolder;
import org.wso2.carbon.analytics.spark.utils.ComputeClasspath;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.utils.CarbonUtils;
import scala.None$;
import scala.Option;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Traversable;

public class SparkAnalyticsExecutor
implements GroupEventListener {
    private static final String CLUSTER_GROUP_NAME = "CARBON_ANALYTICS_EXECUTION";
    private static final String DEFAULT_SPARK_APP_NAME = "DefaultCarbonAnalyticsApp";
    private static final Log log = LogFactory.getLog(SparkAnalyticsExecutor.class);
    private String sparkMaster;
    private SparkConf sparkConf;
    private SQLContext sqlCtx;
    private String myHost;
    private int portOffset = 0;
    private int workerCount = 1;
    private SparkTableNamesHolder sparkTableNamesHolder;
    private UDFConfiguration udfConfiguration;
    private int redundantMasterCount = 1;
    private Set<LeaderElectable> leaderElectable = new HashSet<LeaderElectable>();
    private AnalyticsClusterManager acm;
    private boolean masterActive = false;
    private boolean workerActive = false;
    private boolean clientActive = false;
    private boolean electedLeader = false;
    private java.util.Map<String, String> shorthandStringsMap = new HashMap<String, String>();
    private static final int MAX_RETRIES = 30;
    private static final long MAX_RETRY_WAIT_INTERVAL = 60000L;
    private ClusterMode clusterMode;

    public SparkAnalyticsExecutor(String myHost, int portOffset) throws AnalyticsException {
        this.myHost = myHost;
        this.portOffset = portOffset;
        this.udfConfiguration = this.loadUDFConfiguration();
        this.acm = AnalyticsServiceHolder.getAnalyticsClusterManager();
        String propsFile = GenericUtils.getAnalyticsConfDirectory() + File.separator + "analytics" + File.separator + "spark" + File.separator + "spark-defaults.conf";
        if (!new File(propsFile).exists()) {
            throw new AnalyticsExecutionException("spark-defaults.conf file does not exists in path " + propsFile);
        }
        this.sparkConf = this.initializeSparkConf(this.portOffset, propsFile);
        this.sparkMaster = this.getStringFromSparkConf("carbon.spark.master", "local");
        this.clusterMode = this.getClusterMode(this.sparkMaster);
        this.redundantMasterCount = this.sparkConf.getInt("carbon.spark.master.count", 1);
        this.sparkTableNamesHolder = new SparkTableNamesHolder(this.acm.isClusteringEnabled());
        this.registerShorthandStrings();
    }

    public void initializeSparkServer() throws AnalyticsException {
        this.sparkConf.setMaster(this.sparkMaster);
        switch (this.clusterMode) {
            case local: 
            case standaloneSpark: 
            case yarn: 
            case mesos: {
                log.info((Object)("Starting SPARK in the Client " + this.clusterMode.toString() + ". Master : " + this.sparkMaster));
                if (this.acm.isClusteringEnabled()) {
                    this.acm.joinGroup(CLUSTER_GROUP_NAME, (GroupEventListener)this);
                }
                this.initializeAnalyticsClient();
                break;
            }
            case carbonSpark: {
                log.info((Object)"Starting SPARK in the Carbon Clustering mode");
                this.redundantMasterCount = this.sparkConf.getInt("carbon.spark.master.count", 2);
                if (this.sparkTableNamesHolder == null) {
                    this.sparkTableNamesHolder = new SparkTableNamesHolder(true);
                }
                if (this.acm.isClusteringEnabled()) {
                    this.runClusteredSetupLogic();
                    break;
                }
                throw new AnalyticsClusterException("Spark started in the cluster mode without enabling Carbon Clustering");
            }
        }
    }

    private void runClusteredSetupLogic() throws AnalyticsException {
        String thisMasterUrl = "spark://" + this.myHost + ":" + this.sparkConf.get("spark.master.port");
        this.logDebug("Spark master URL for this node : " + thisMasterUrl);
        HazelcastInstance hz = AnalyticsServiceHolder.getHazelcastInstance();
        IMap masterMap = hz.getMap("__SPARK_MASTER_MAP__");
        Object localMember = this.acm.getLocalMember();
        log.info((Object)("Local member : " + localMember));
        Set masterUrls = masterMap.keySet();
        this.logDebug("Master URLs : " + Arrays.toString(masterUrls.toArray()));
        log.info((Object)("Current Spark Master map size : " + masterMap.size()));
        if (masterUrls.contains(thisMasterUrl) || masterMap.size() < this.redundantMasterCount) {
            log.info((Object)"Masters available are less than the redundant master count or This is/ has been a member of the MasterMap");
            if (!masterUrls.contains(thisMasterUrl)) {
                log.info((Object)("Adding member to the Spark Master map : " + localMember));
                masterMap.put(thisMasterUrl, localMember);
            }
            log.info((Object)"Starting SPARK MASTER...");
            this.startMaster();
        }
        this.acm.joinGroup(CLUSTER_GROUP_NAME, (GroupEventListener)this);
        log.info((Object)("Member joined the Carbon Analytics Execution cluster : " + localMember));
        this.processLeaderElectable();
        log.info((Object)("Spark Master map size after starting masters : " + masterMap.size()));
        if (masterMap.size() >= this.redundantMasterCount) {
            log.info((Object)"Redundant master count reached. Starting workers in all members...");
            this.acm.executeAll(CLUSTER_GROUP_NAME, (Callable)new StartWorkerExecutionCall());
            log.info((Object)"Redundant master count reached. Starting Spark client app in the carbon cluster master...");
            this.initializeAnalyticsClient();
        }
    }

    private String[] getSparkMastersFromCluster() {
        HazelcastInstance hz = AnalyticsServiceHolder.getHazelcastInstance();
        IMap masterMap = hz.getMap("__SPARK_MASTER_MAP__");
        Set masterUrls = masterMap.keySet();
        return masterUrls.toArray(new String[masterUrls.size()]);
    }

    private UDFConfiguration loadUDFConfiguration() throws AnalyticsException {
        try {
            File confFile = new File(GenericUtils.getAnalyticsConfDirectory() + File.separator + "analytics" + File.separator + "spark" + File.separator + "spark-udf-config.xml");
            if (!confFile.exists()) {
                throw new AnalyticsUDFException("Cannot load UDFs, the UDF configuration file cannot be found at: " + confFile.getPath());
            }
            JAXBContext ctx = JAXBContext.newInstance((Class[])new Class[]{UDFConfiguration.class});
            Unmarshaller unmarshaller = ctx.createUnmarshaller();
            return (UDFConfiguration)unmarshaller.unmarshal(confFile);
        }
        catch (JAXBException e) {
            throw new AnalyticsUDFException("Error in processing UDF configuration: " + e.getMessage(), e);
        }
    }

    private void initializeAnalyticsClient() throws AnalyticsException {
        if (this.acm.isClusteringEnabled()) {
            log.info((Object)"Sending a cluster message to the leader to initialize the Spark application");
            this.acm.executeOne(CLUSTER_GROUP_NAME, this.acm.getLeader(CLUSTER_GROUP_NAME), (Callable)new InitClientExecutionCall());
        } else {
            log.info((Object)"Initializing the Spark application locally");
            this.initializeAnalyticsClientLocal();
        }
    }

    public synchronized void initializeAnalyticsClientLocal() throws AnalyticsException {
        if (ServiceHolder.isAnalyticsSparkContextEnabled()) {
            if (!this.clientActive) {
                if (this.clusterMode == ClusterMode.carbonSpark) {
                    this.updateMaster(this.sparkConf);
                }
                this.initializeSqlContext(this.initializeSparkContext(this.sparkConf));
                this.clientActive = true;
                log.info((Object)("Started Spark CLIENT in the cluster pointing to MASTER " + this.sparkConf.get("spark.master") + " with the application name : " + this.sparkConf.get("spark.app.name") + " and UI port : " + this.sparkConf.get("spark.ui.port")));
            } else {
                log.info((Object)"Client is already active in this node, therefore ignoring client init");
            }
        } else {
            this.logDebug("Analytics Spark Context is disabled in this node, therefore ignoring the client initiation.");
        }
    }

    private JavaSparkContext initializeSparkContext(SparkConf conf) throws AnalyticsException {
        JavaSparkContext jsc;
        try {
            jsc = new JavaSparkContext(conf);
        }
        catch (Throwable e) {
            throw new AnalyticsException("Unable to create analytics client. " + e.getMessage(), e);
        }
        ServiceHolder.setJavaSparkContext(jsc);
        return jsc;
    }

    private void initializeSqlContext(JavaSparkContext jsc) throws AnalyticsUDFException {
        this.sqlCtx = new SQLContext(jsc);
        this.registerUDFs(this.sqlCtx);
    }

    public void registerUDFFromOSGIComponent(CarbonUDF carbonUDF) throws AnalyticsUDFException {
        if (this.sqlCtx != null) {
            Method[] methods;
            AnalyticsUDFsRegister analyticsUDFsRegister = AnalyticsUDFsRegister.getInstance();
            Class<Object> udf = carbonUDF.getClass();
            for (Method method : methods = udf.getDeclaredMethods()) {
                if (!Modifier.isPublic(method.getModifiers())) continue;
                analyticsUDFsRegister.registerUDF(udf, method, this.sqlCtx);
            }
        } else {
            ServiceHolder.addCarbonUDFs(carbonUDF);
        }
    }

    private void registerUDFs(SQLContext sqlCtx) throws AnalyticsUDFException {
        ArrayList<String> udfClassNames = new ArrayList<String>();
        if (!this.udfConfiguration.getCustomUDFClass().isEmpty()) {
            udfClassNames.addAll(this.udfConfiguration.getCustomUDFClass());
        }
        if (!ServiceHolder.getCarbonUDFs().isEmpty()) {
            udfClassNames.addAll(ServiceHolder.getCarbonUDFs().keySet());
        }
        AnalyticsUDFsRegister udfAdaptorBuilder = AnalyticsUDFsRegister.getInstance();
        try {
            for (String udfClassName : udfClassNames) {
                Method[] methods;
                if ((udfClassName = udfClassName.trim()).isEmpty()) continue;
                Class<Object> udf = Class.forName(udfClassName);
                for (Method method : methods = udf.getDeclaredMethods()) {
                    try {
                        if (!Modifier.isPublic(method.getModifiers())) continue;
                        udfAdaptorBuilder.registerUDF(udf, method, sqlCtx);
                    }
                    catch (AnalyticsUDFException e) {
                        log.error((Object)("Error while registering the UDF method: " + method.getName() + ", " + e.getMessage()), (Throwable)((Object)e));
                    }
                }
            }
        }
        catch (ClassNotFoundException e) {
            throw new AnalyticsUDFException("Error While registering UDFs: " + e.getMessage(), e);
        }
    }

    private synchronized void startMaster() throws AnalyticsClusterException {
        if (!this.masterActive) {
            String host = this.myHost;
            int port = this.sparkConf.getInt("spark.master.port", 7077 + this.portOffset);
            int webUiPort = this.sparkConf.getInt("spark.master.webui.port", 8081 + this.portOffset);
            Master.startSystemAndActor((String)host, (int)port, (int)webUiPort, (SparkConf)this.sparkConf);
            log.info((Object)("Started SPARK MASTER in spark://" + host + ":" + port + " with webUI port : " + webUiPort));
            this.updateMaster(this.sparkConf);
            this.masterActive = true;
        } else {
            this.logDebug("Master is already active in this node, therefore ignoring Master startup");
        }
    }

    private void processLeaderElectable() throws AnalyticsClusterException {
        if (!this.isElectedLeaderAvailable()) {
            log.info((Object)"No elected leader is available. Hence electing this member as the leader");
            this.electAsLeader();
        }
    }

    private boolean isElectedLeaderAvailable() throws AnalyticsClusterException {
        if (this.acm.getMembers(CLUSTER_GROUP_NAME).isEmpty()) {
            log.info((Object)"Cluster is empty. Hence no elected leader available");
            return false;
        }
        List clusterElectedLeaders = this.acm.executeAll(CLUSTER_GROUP_NAME, (Callable)new CheckElectedLeaderExecutionCall());
        return clusterElectedLeaders.contains(true);
    }

    public boolean isElectedLeader() {
        return this.electedLeader;
    }

    private void updateMaster(SparkConf conf) {
        String[] masters = this.getSparkMastersFromCluster();
        StringBuilder buf = new StringBuilder();
        buf.append("spark://");
        for (int i = 0; i < masters.length; ++i) {
            buf.append(masters[i].replace("spark://", ""));
            if (i >= masters.length - 1) continue;
            buf.append(",");
        }
        conf.setMaster(buf.toString());
    }

    private void addSparkPropertiesPortOffset(SparkConf conf, int portOffset) {
        Tuple2[] properties;
        for (Tuple2 prop : properties = conf.getAll()) {
            String key = ((String)prop._1()).trim();
            if (!key.startsWith("spark.") || !key.endsWith(".port")) continue;
            String withPortOffset = Integer.toString(Integer.parseInt((String)prop._2()) + portOffset);
            conf.set(key, withPortOffset);
        }
    }

    public synchronized void startWorker() {
        if (!this.workerActive) {
            String workerHost = this.myHost;
            int workerPort = this.sparkConf.getInt("spark.worker.port", 10000 + this.portOffset);
            int workerUiPort = this.sparkConf.getInt("spark.worker.webui.port", 10500 + this.portOffset);
            int workerCores = this.sparkConf.getInt("spark.worker.cores", 1);
            String workerMemory = this.getStringFromSparkConf("spark.worker.memory", "1g");
            Object[] masters = this.getSparkMastersFromCluster();
            String workerDir = this.getStringFromSparkConf("spark.worker.dir", "work");
            Worker.startSystemAndActor((String)workerHost, (int)workerPort, (int)workerUiPort, (int)workerCores, (int)Utils.memoryStringToMb((String)workerMemory), (String[])masters, (String)workerDir, (Option)None$.MODULE$, (SparkConf)this.sparkConf);
            log.info((Object)("Started SPARK WORKER in " + workerHost + ":" + workerPort + " with webUI port " + workerUiPort + " with Masters " + Arrays.toString(masters)));
            this.workerActive = true;
        } else {
            this.logDebug("Worker is already active in this node, therefore ignoring worker startup");
        }
    }

    private SparkConf initializeSparkConf(int portOffset, String propsFile) throws AnalyticsException {
        SparkConf conf = new SparkConf(false);
        log.info((Object)("Loading Spark defaults from " + propsFile));
        Map properties = Utils.getPropertiesFromFile((String)propsFile);
        conf.setAll((Traversable)properties);
        this.setAdditionalConfigs(conf);
        this.addSparkPropertiesPortOffset(conf, portOffset);
        return conf;
    }

    private void setAdditionalConfigs(SparkConf conf) throws AnalyticsException {
        String sparkClasspath;
        String carbonConfDir;
        String carbonHome = null;
        try {
            carbonHome = conf.get("carbon.das.symbolic.link");
            this.logDebug("CARBON HOME set with the symbolic link " + carbonHome);
        }
        catch (NoSuchElementException e) {
            try {
                carbonHome = CarbonUtils.getCarbonHome();
            }
            catch (Throwable ex) {
                this.logDebug("CARBON HOME can not be found. Spark conf in non-carbon environment");
            }
        }
        this.logDebug("CARBON HOME used for Spark Conf : " + carbonHome);
        if (carbonHome != null) {
            carbonConfDir = carbonHome + File.separator + "repository" + File.separator + "conf";
        } else {
            this.logDebug("CARBON HOME is NULL. Spark conf in non-carbon environment. Using the custom conf path");
            carbonConfDir = GenericUtils.getAnalyticsConfDirectory();
        }
        String analyticsSparkConfDir = carbonConfDir + File.separator + "analytics" + File.separator + "spark";
        conf.setIfMissing("spark.app.name", DEFAULT_SPARK_APP_NAME);
        conf.setIfMissing("spark.driver.cores", "1");
        conf.setIfMissing("spark.driver.memory", "512m");
        conf.setIfMissing("spark.executor.memory", "512m");
        conf.setIfMissing("spark.ui.port", "4040");
        conf.setIfMissing("spark.history.opts", "18080");
        conf.setIfMissing("spark.serializer", KryoSerializer.class.getName());
        conf.setIfMissing("spark.kryoserializer.buffer", "256k");
        conf.setIfMissing("spark.kryoserializer.buffer.max", "256m");
        conf.setIfMissing("spark.blockManager.port", "12000");
        conf.setIfMissing("spark.broadcast.port", "12500");
        conf.setIfMissing("spark.driver.port", "13000");
        conf.setIfMissing("spark.executor.port", "13500");
        conf.setIfMissing("spark.fileserver.port", "14000");
        conf.setIfMissing("spark.replClassServer.port", "14500");
        conf.setIfMissing("spark.master.port", "7077");
        conf.setIfMissing("spark.master.rest.port", "6066");
        conf.setIfMissing("spark.master.webui.port", "8081");
        conf.setIfMissing("spark.worker.cores", "1");
        conf.setIfMissing("spark.worker.memory", "1g");
        conf.setIfMissing("spark.worker.dir", "work");
        conf.setIfMissing("spark.worker.port", "11000");
        conf.setIfMissing("spark.worker.webui.port", "11500");
        conf.setIfMissing("spark.scheduler.mode", "FAIR");
        conf.setIfMissing("spark.scheduler.pool", "carbon-pool");
        conf.setIfMissing("spark.scheduler.allocation.file", analyticsSparkConfDir + File.separator + "fairscheduler.xml");
        conf.setIfMissing("spark.deploy.recoveryMode", "CUSTOM");
        conf.setIfMissing("spark.deploy.recoveryMode.factory", AnalyticsRecoveryModeFactory.class.getName());
        String agentConfPath = carbonHome + File.separator + "repository" + File.separator + "conf" + File.separator + "data-bridge" + File.separator + "data-agent-config.xml";
        String jvmOpts = " -Dwso2_custom_conf_dir=" + carbonConfDir + " -Dcarbon.home=" + carbonHome + " -D" + "disableLocalIndexQueue" + "=true" + " -DdisableIndexing=true" + " -DdisableDataPurging=true" + " -DdisableEventSink=true" + " -Djavax.net.ssl.trustStore=" + System.getProperty("javax.net.ssl.trustStore") + " -Djavax.net.ssl.trustStorePassword=" + System.getProperty("javax.net.ssl.trustStorePassword") + " -DAgent.Config.Path=" + agentConfPath + this.getLog4jPropertiesJvmOpt(analyticsSparkConfDir);
        conf.set("spark.executor.extraJavaOptions", conf.get("spark.executor.extraJavaOptions", "") + jvmOpts);
        conf.set("spark.driver.extraJavaOptions", conf.get("spark.driver.extraJavaOptions", "") + jvmOpts);
        conf.setIfMissing("carbon.spark.results.limit", "1000");
        String string = sparkClasspath = System.getProperty("SPARK_CLASSPATH") == null ? "" : System.getProperty("SPARK_CLASSPATH");
        if (carbonHome != null) {
            try {
                ClusterMode clusterMode = this.getClusterMode(conf.get("carbon.spark.master"));
                if (clusterMode != ClusterMode.local && clusterMode != ClusterMode.carbonSpark) {
                    sparkClasspath = ComputeClasspath.getSparkClasspath((String)sparkClasspath, (String)carbonHome, (String[])new String[]{"slf4j"});
                }
                sparkClasspath = ComputeClasspath.getSparkClasspath((String)sparkClasspath, (String)carbonHome);
            }
            catch (IOException e) {
                throw new AnalyticsExecutionException("Unable to create the extra spark classpath" + e.getMessage(), e);
            }
        } else {
            this.logDebug("CARBON HOME is NULL. Spark conf in non-carbon environment");
        }
        try {
            conf.set("spark.executor.extraClassPath", conf.get("spark.executor.extraClassPath") + ";" + sparkClasspath);
        }
        catch (NoSuchElementException e) {
            conf.set("spark.executor.extraClassPath", sparkClasspath);
        }
        try {
            conf.set("spark.driver.extraClassPath", conf.get("spark.driver.extraClassPath") + ";" + sparkClasspath);
        }
        catch (NoSuchElementException e) {
            conf.set("spark.driver.extraClassPath", sparkClasspath);
        }
    }

    private String getLog4jPropertiesJvmOpt(String analyticsSparkConfDir) {
        File tempFile = new File(analyticsSparkConfDir + File.separator + "log4j.properties");
        if (tempFile.exists()) {
            return " -Dlog4j.configuration=file:" + File.separator + File.separator + tempFile.getAbsolutePath();
        }
        return "";
    }

    private String getStringFromSparkConf(String config, String defaultVal) {
        try {
            return this.sparkConf.get(config);
        }
        catch (NoSuchElementException e) {
            return defaultVal;
        }
    }

    public void stop() {
        if (this.sqlCtx != null) {
            this.sqlCtx.sparkContext().stop();
        }
    }

    public int getNumPartitionsHint() throws AnalyticsException {
        int workerCount = this.getWorkerCount();
        int workerCores = this.sparkConf.getInt("spark.worker.cores", 1);
        int partitionCount = workerCount * workerCores;
        if (workerCount == 0) {
            throw new AnalyticsException("Error while calculating NumPartitionsHint. Worker count is zero.");
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Partition count: " + partitionCount));
        }
        return partitionCount;
    }

    public AnalyticsQueryResult executeQuery(int tenantId, String query) throws AnalyticsExecutionException {
        AnalyticsClusterManager acm = AnalyticsServiceHolder.getAnalyticsClusterManager();
        if (acm.isClusteringEnabled() && !acm.isLeader(CLUSTER_GROUP_NAME)) {
            try {
                return (AnalyticsQueryResult)acm.executeOne(CLUSTER_GROUP_NAME, acm.getLeader(CLUSTER_GROUP_NAME), (Callable)new AnalyticsExecutionCall(tenantId, query));
            }
            catch (AnalyticsClusterException e) {
                throw new AnalyticsExecutionException("Error executing analytics query: " + e.getMessage(), e);
            }
        }
        return this.executeQueryLocal(tenantId, query);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AnalyticsQueryResult executeQueryLocal(int tenantId, String query) throws AnalyticsExecutionException {
        if (tenantId != -1234 && this.isCarbonJDBCQuery(query)) {
            throw new RuntimeException("The CarbonJDBC relation provider is not available for tenants.");
        }
        if (AnalyticsDataServiceUtils.isCarbonServer()) {
            PrivilegedCarbonContext.startTenantFlow();
            PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId);
        }
        String origQuery = query.trim();
        if ((query = query.trim()).endsWith(";")) {
            query = query.substring(0, query.length() - 1).trim();
        }
        try {
            AnalyticsQueryResult analyticsQueryResult;
            if (this.checkIncrementalQuery(query)) {
                AnalyticsQueryResult analyticsQueryResult2 = this.processIncQuery(tenantId, query);
                return analyticsQueryResult2;
            }
            query = this.encodeQueryWithTenantId(tenantId, query);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Executing : " + origQuery));
            }
            long start = System.currentTimeMillis();
            try {
                if (this.sqlCtx == null) {
                    throw new AnalyticsExecutionException("Spark SQL Context is not available. Check if the cluster has instantiated properly.");
                }
                this.sqlCtx.sparkContext().setLocalProperty("spark.scheduler.pool", this.sparkConf.get("spark.scheduler.pool"));
                DataFrame result = this.sqlCtx.sql(query);
                analyticsQueryResult = this.toResult(result);
            }
            catch (Throwable throwable) {
                long end = System.currentTimeMillis();
                if (ServiceHolder.isAnalyticsStatsEnabled()) {
                    log.info((Object)("Executed query: " + origQuery + " \nTime Elapsed: " + (double)(end - start) / 1000.0 + " seconds."));
                }
                throw throwable;
            }
            long end = System.currentTimeMillis();
            if (ServiceHolder.isAnalyticsStatsEnabled()) {
                log.info((Object)("Executed query: " + origQuery + " \nTime Elapsed: " + (double)(end - start) / 1000.0 + " seconds."));
            }
            return analyticsQueryResult;
        }
        finally {
            if (AnalyticsDataServiceUtils.isCarbonServer()) {
                PrivilegedCarbonContext.endTenantFlow();
            }
        }
    }

    private AnalyticsQueryResult processIncQuery(int tenantId, String query) throws AnalyticsExecutionException {
        AnalyticsQueryResult result;
        String[] splits = query.split("(\\s*,\\s*|\\s+)");
        switch (splits[0].toLowerCase()) {
            case "incremental_table_commit": {
                result = this.processIncTableCommit(tenantId, Arrays.copyOfRange(splits, 1, splits.length));
                break;
            }
            case "incremental_table_reset": {
                result = this.processIncTableReset(tenantId, Arrays.copyOfRange(splits, 1, splits.length));
                break;
            }
            case "incremental_table_show": {
                result = this.processIncTableShow(tenantId, Arrays.copyOfRange(splits, 1, splits.length));
                break;
            }
            default: {
                throw new AnalyticsExecutionException("Invalid incremental query: " + query);
            }
        }
        return result;
    }

    private AnalyticsQueryResult processIncTableShow(int tenantId, String[] tableIds) throws AnalyticsExecutionException {
        ArrayList<List<Object>> tableResults = new ArrayList<List<Object>>();
        for (String tableId : tableIds) {
            ArrayList<Object> tableResult = new ArrayList<Object>(3);
            tableResult.add(tableId);
            try {
                tableResult.add(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(tenantId, tableId, false));
                tableResult.add(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(tenantId, tableId, true));
            }
            catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
            tableResults.add(tableResult);
        }
        return new AnalyticsQueryResult(new String[]{"TABLE_ID", "TEMP_VAL", "PRIMARY_VAL"}, tableResults);
    }

    private AnalyticsQueryResult processIncTableReset(int tenantId, String[] tableIds) throws AnalyticsExecutionException {
        for (String tableId : tableIds) {
            try {
                ServiceHolder.getIncrementalMetaStore().resetIncrementalTimestamps(tenantId, tableId);
            }
            catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
        }
        return AnalyticsQueryResult.emptyAnalyticsQueryResult();
    }

    private AnalyticsQueryResult processIncTableCommit(int tenantId, String[] tableIds) throws AnalyticsExecutionException {
        ArrayList<List<Object>> tableResults = new ArrayList<List<Object>>();
        for (String tableId : tableIds) {
            ArrayList<Object> tableResult = new ArrayList<Object>(2);
            tableResult.add(tableId);
            try {
                tableResult.add(ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(tenantId, tableId, true));
                long tempTS = ServiceHolder.getIncrementalMetaStore().getLastProcessedTimestamp(tenantId, tableId, false);
                ServiceHolder.getIncrementalMetaStore().setLastProcessedTimestamp(tenantId, tableId, tempTS, true);
                tableResult.add(tempTS);
            }
            catch (AnalyticsException e) {
                throw new AnalyticsExecutionException(e.getMessage(), e);
            }
            tableResults.add(tableResult);
        }
        return new AnalyticsQueryResult(new String[]{"TABLE_ID", "PREV_PRIMARY_VAL", "NEW_PRIMARY_VAL"}, tableResults);
    }

    private boolean checkIncrementalQuery(String str) {
        return str.trim().toLowerCase().startsWith("incremental_table_");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String encodeQueryWithTenantId(int tenantId, String query) throws AnalyticsExecutionException {
        String result;
        Pattern p = Pattern.compile("(?i)(?<=(create\\stemporary\\stable))\\s+\\w+(?=\\s+using\\s\\D+\\s+options\\s*\\()");
        Matcher m = p.matcher(query.trim());
        if (m.find()) {
            String tempTableName = m.group().trim();
            if (tempTableName.matches("(?i)if")) {
                throw new AnalyticsExecutionException("Malformed query: CREATE TEMPORARY TABLE IF NOT EXISTS is not supported");
            }
            SparkTableNamesHolder sparkTableNamesHolder = this.sparkTableNamesHolder;
            synchronized (sparkTableNamesHolder) {
                this.sparkTableNamesHolder.addTableName(tenantId, tempTableName);
            }
            result = this.replaceShorthandStrings(query);
            int optStrStart = result.toLowerCase().indexOf("options", m.end());
            int bracketsOpen = result.indexOf("(", optStrStart);
            int bracketsClose = result.indexOf(")", bracketsOpen);
            String options = this.isCarbonQuery(query) ? result.substring(optStrStart, bracketsOpen + 1) + this.addTenantIdToOptions(tenantId, result.substring(bracketsOpen + 1, bracketsClose)) + ")" : result.substring(optStrStart, bracketsClose + 1);
            String beforeOptions = this.replaceTableNamesInQuery(tenantId, result.substring(0, optStrStart));
            String afterOptions = this.replaceTableNamesInQuery(tenantId, result.substring(bracketsClose + 1, result.length()));
            result = beforeOptions + options + afterOptions;
        } else {
            result = this.replaceTableNamesInQuery(tenantId, query);
        }
        return result.trim();
    }

    private boolean isCarbonQuery(String query) {
        return query.contains("CarbonAnalytics") || query.contains("CompressedEventAnalytics");
    }

    private boolean isCarbonJDBCQuery(String query) {
        Pattern pattern = Pattern.compile("using\\s*CarbonJDBC");
        Matcher matcher = pattern.matcher(query.trim());
        return matcher.find();
    }

    private String replaceShorthandStrings(String query) {
        for (Map.Entry<String, String> entry : this.shorthandStringsMap.entrySet()) {
            query = query.replaceFirst("\\b" + entry.getKey() + "\\b", entry.getValue());
        }
        return query;
    }

    private void registerShorthandStrings() {
        this.addShorthandString("CarbonAnalytics", AnalyticsRelationProvider.class.getName());
        this.addShorthandString("CarbonJDBC", AnalyticsJDBCRelationProvider.class.getName());
        this.addShorthandString("CompressedEventAnalytics", CompressedEventAnalyticsRelationProvider.class.getName());
    }

    private void addShorthandString(String shorthand, String className) {
        try {
            Class.forName(className);
            this.shorthandStringsMap.put(shorthand, className);
        }
        catch (ClassNotFoundException e) {
            log.error((Object)e);
        }
    }

    private String addTenantIdToOptions(int tenantId, String optStr) throws AnalyticsExecutionException {
        String[] opts = optStr.split("\\s*,\\s*");
        boolean hasTenantId = false;
        for (String option : opts) {
            String[] splits = option.trim().split("\\s+", 2);
            hasTenantId = splits[0].equals("tenantId");
            if (!hasTenantId || tenantId == Integer.parseInt(splits[1].replaceAll("^\"|\"$", ""))) continue;
            throw new AnalyticsExecutionException("Mismatching tenants : " + tenantId + " and " + splits[1].replaceAll("^\"|\"$", ""));
        }
        if (!hasTenantId) {
            optStr = optStr + " , " + "tenantId" + " \"" + tenantId + "\"";
        }
        return optStr;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String replaceTableNamesInQuery(int tenantId, String query) {
        String result = query;
        SparkTableNamesHolder sparkTableNamesHolder = this.sparkTableNamesHolder;
        synchronized (sparkTableNamesHolder) {
            Collection<String> tableNames = this.sparkTableNamesHolder.getTableNames(tenantId);
            for (String name : tableNames) {
                result = result.replaceAll("\\b" + name + "\\b", AnalyticsCommonUtils.encodeTableNameWithTenantId(tenantId, name));
            }
            return result;
        }
    }

    private AnalyticsQueryResult toResult(DataFrame dataFrame) throws AnalyticsExecutionException {
        int resultsLimit = this.sparkConf.getInt("carbon.spark.results.limit", -1);
        if (resultsLimit != -1) {
            return new AnalyticsQueryResult(dataFrame.schema().fieldNames(), this.convertRowsToObjects(dataFrame.limit(resultsLimit).collect()));
        }
        return new AnalyticsQueryResult(dataFrame.schema().fieldNames(), this.convertRowsToObjects(dataFrame.collect()));
    }

    private List<List<Object>> convertRowsToObjects(Row[] rows) {
        ArrayList<List<Object>> result = new ArrayList<List<Object>>();
        for (Row row : rows) {
            ArrayList<Object> objects = new ArrayList<Object>();
            for (int i = 0; i < row.length(); ++i) {
                objects.add(row.get(i));
            }
            result.add(objects);
        }
        return result;
    }

    public void onBecomingLeader() {
        boolean isFailed;
        log.info((Object)"This node is now the CARBON CLUSTERING LEADER");
        boolean bl = isFailed = !this.executeOnBecomingLeaderFlow();
        for (int retries = 0; isFailed && retries < 30; ++retries) {
            log.info((Object)("Retrying executing On Becoming Leader flow. Retry count = " + retries));
            long waitTime = Math.min(this.getWaitTimeExp(retries), 60000L);
            this.retryWait(waitTime);
            isFailed = !this.executeOnBecomingLeaderFlow();
        }
    }

    private boolean executeOnBecomingLeaderFlow() {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Executing On Becoming Leader Flow : ");
        }
        try {
            if (this.clusterMode == ClusterMode.carbonSpark) {
                HazelcastInstance hz = AnalyticsServiceHolder.getHazelcastInstance();
                IMap masterMap = hz.getMap("__SPARK_MASTER_MAP__");
                if (masterMap.isEmpty()) {
                    log.info((Object)"Spark master map is empty...");
                    String masterUrl = "spark://" + this.myHost + ":" + this.sparkConf.getInt("spark.master.port", 7077 + this.portOffset);
                    masterMap.put((Object)masterUrl, this.acm.getLocalMember());
                    log.info((Object)("Added " + masterUrl + " to the MasterMap"));
                } else if (masterMap.size() >= this.redundantMasterCount) {
                    log.info((Object)("Redundant master count fulfilled : " + masterMap.size()));
                    if (!this.isElectedLeaderAvailable()) {
                        log.info((Object)"No Elected SPARK LEADER in the cluster. Electing a suitable leader...");
                        try {
                            this.electSuitableLeader();
                        }
                        catch (AnalyticsClusterException e) {
                            String msg = "Unable to elect a suitable leader : " + e.getMessage();
                            log.error((Object)msg, (Throwable)e);
                            throw new RuntimeException(msg, e);
                        }
                    }
                    log.info((Object)"Initializing new spark client app...");
                    this.initializeAnalyticsClient();
                } else {
                    log.info((Object)"Master map size is less than the redundant master count");
                }
            } else {
                log.info((Object)"Analytics cluster leadership has changed. Hence, re-creating the Spark Client application");
                this.initializeAnalyticsClient();
            }
            return true;
        }
        catch (Exception e) {
            String msg = "Error in processing on becoming leader cluster message: " + e.getMessage();
            log.warn((Object)msg, (Throwable)e);
            return false;
        }
    }

    private void electSuitableLeader() throws AnalyticsClusterException {
        HazelcastInstance hz = AnalyticsServiceHolder.getHazelcastInstance();
        IMap masterMap = hz.getMap("__SPARK_MASTER_MAP__");
        ArrayList masterMembers = new ArrayList(masterMap.values());
        List groupMembers = this.acm.getMembers(CLUSTER_GROUP_NAME);
        boolean foundSuitableMaster = false;
        for (Object masterMember : masterMembers) {
            if (!groupMembers.contains(masterMember)) continue;
            this.acm.executeOne(CLUSTER_GROUP_NAME, masterMember, (Callable)new ElectLeaderExecutionCall());
            foundSuitableMaster = true;
            log.info((Object)("Suitable leader elected : " + masterMember));
            break;
        }
        if (!foundSuitableMaster) {
            log.error((Object)"No Spark master is available in the cluster to be elected as the leader");
        }
    }

    public synchronized void electAsLeader() {
        log.info((Object)"Elected as the Spark Leader");
        for (LeaderElectable le : this.leaderElectable) {
            le.electedLeader();
        }
        this.electedLeader = true;
    }

    public void onLeaderUpdate() {
    }

    private int getWorkerCount() {
        return this.workerCount;
    }

    public void onMembersChangeForLeader(boolean removedMember) {
        boolean isFailed;
        log.info((Object)("Member change, remove: " + removedMember));
        boolean bl = isFailed = !this.executeOnMembersChangeForLeaderFlow(removedMember);
        for (int retries = 0; isFailed && retries < 30; ++retries) {
            log.info((Object)("Retrying executing On Member Change for Leader Flow. Retry count = " + retries));
            long waitTime = Math.min(this.getWaitTimeExp(retries), 60000L);
            this.retryWait(waitTime);
            isFailed = !this.executeOnMembersChangeForLeaderFlow(removedMember);
        }
    }

    private boolean executeOnMembersChangeForLeaderFlow(boolean removedMember) {
        this.logDebug("Execute On Members Change For Leader Flow");
        try {
            if (this.clusterMode == ClusterMode.carbonSpark) {
                this.workerCount = AnalyticsServiceHolder.getAnalyticsClusterManager().getMembers(CLUSTER_GROUP_NAME).size();
                log.info((Object)("Analytics worker updated, total count: " + this.getWorkerCount()));
                if (removedMember) {
                    if (!this.isElectedLeaderAvailable()) {
                        log.info((Object)"Removed member was the Spark elected leader. Electing a suitable leader...");
                        this.electSuitableLeader();
                    } else {
                        log.info((Object)"Elected leader already available.");
                    }
                }
            }
            return true;
        }
        catch (Exception e) {
            String msg = "Error while executing On Members Change For Leader Flow: " + e.getMessage();
            log.warn((Object)msg, (Throwable)e);
            return false;
        }
    }

    public void registerLeaderElectable(LeaderElectable le) {
        this.leaderElectable.add(le);
        log.info((Object)"Spark leader electable registered");
    }

    private void logDebug(String msg) {
        if (log.isDebugEnabled()) {
            log.debug((Object)msg);
        }
    }

    public void onMemberRemoved() {
    }

    private long getWaitTimeExp(int retryCount) {
        return (long)Math.pow(2.0, retryCount) * 100L;
    }

    private void retryWait(long waitTime) {
        try {
            Thread.sleep(waitTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private ClusterMode getClusterMode(String sparkMaster) throws AnalyticsExecutionException {
        if (sparkMaster.toLowerCase().startsWith("local")) {
            if (this.acm.isClusteringEnabled()) {
                log.warn((Object)"Using 'local' with Carbon clustering is deprecated. Please use 'carbon.spark.master carbon' instead!");
                return ClusterMode.carbonSpark;
            }
            return ClusterMode.local;
        }
        if (sparkMaster.toLowerCase().startsWith("carbon")) {
            if (!this.acm.isClusteringEnabled()) {
                throw new AnalyticsExecutionException("Using Carbon Clustering without enabling clustering in axis2. Please refer axis2 settings.");
            }
            return ClusterMode.carbonSpark;
        }
        if (sparkMaster.toLowerCase().startsWith("spark")) {
            return ClusterMode.standaloneSpark;
        }
        if (sparkMaster.toLowerCase().startsWith("yarn")) {
            if (sparkMaster.equalsIgnoreCase("yarn-cluster")) {
                throw new AnalyticsExecutionException("\"yarn-cluster\" mode is not supported in DAS. Please use \"yarn-cluster\"!");
            }
            return ClusterMode.yarn;
        }
        if (sparkMaster.toLowerCase().startsWith("mesos")) {
            return ClusterMode.mesos;
        }
        throw new AnalyticsExecutionException("Unknown cluster mode for Spark : " + sparkMaster);
    }

    private static enum ClusterMode {
        local("Local"),
        carbonSpark("Carbon Spark"),
        standaloneSpark("Standalone Spark"),
        yarn("Spark on YARN"),
        mesos("Spark on Mesos");

        private String name;

        private ClusterMode(String name) {
            this.name = name;
        }

        private String getValue() {
            return this.name;
        }

        public String toString() {
            return this.getValue();
        }
    }
}

