/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.operator;

import cn.tenmg.dsl.NamedScript;
import cn.tenmg.dsl.ParamsParser;
import cn.tenmg.dsl.utils.DSLUtils;
import cn.tenmg.dsl.utils.StringUtils;
import cn.tenmg.flink.jobs.context.FlinkJobsContext;
import cn.tenmg.flink.jobs.exception.IllegalConfigurationException;
import cn.tenmg.flink.jobs.kit.ParamsKit;
import cn.tenmg.flink.jobs.model.DataSync;
import cn.tenmg.flink.jobs.model.data.sync.Column;
import cn.tenmg.flink.jobs.operator.AbstractOperator;
import cn.tenmg.flink.jobs.operator.data.sync.MetaDataGetter;
import cn.tenmg.flink.jobs.operator.data.sync.MetaDataGetterFactory;
import cn.tenmg.flink.jobs.parser.FlinkSQLParamsParser;
import cn.tenmg.flink.jobs.utils.ConfigurationUtils;
import cn.tenmg.flink.jobs.utils.MapUtils;
import cn.tenmg.flink.jobs.utils.SQLUtils;
import cn.tenmg.flink.jobs.utils.StreamTableEnvironmentUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSyncOperator
extends AbstractOperator<DataSync> {
    private static Logger log = LoggerFactory.getLogger(DataSyncOperator.class);
    private static final String SMART_KEY = "data.sync.smart";
    private static final String FROM_TABLE_PREFIX_KEY = "data.sync.from_table_prefix";
    private static final String TOPIC_KEY = "topic";
    private static final String GROUP_ID_KEY = "properties.group.id";
    private static final String GROUP_ID_PREFIX_KEY = "data.sync.group_id_prefix";
    private static final String TIMESTAMP_COLUMNS = "data.sync.timestamp.columns";
    private static final String TIMESTAMP_COLUMNS_SPLIT = ",";
    private static final String TIMESTAMP_FROM_TYPE_KEY = "data.sync.timestamp.from_type";
    private static final String TIMESTAMP_TO_TYPE_KEY = "data.sync.timestamp.to_type";
    private static final String TYPE_KEY_PREFIX = "data.sync.";
    private static final String TO_TYPE_KEY_SUFFIX = ".to_type";
    private static final String FROM_TYPE_KEY_SUFFIX = ".from_type";
    private static final String SCRIPT_KEY_SUFFIX = ".script";
    private static final String STRATEGY_KEY_SUFFIX = ".strategy";
    private static final String COLUMN_NAME = "columnName";
    private static final boolean TO_LOWERCASE = Boolean.valueOf(FlinkJobsContext.getProperty("data.sync.timestamp.case_sensitive")) == false;
    private static final Map<String, ColumnConvertArgs> columnConvertArgsMap = new HashMap<String, ColumnConvertArgs>();

    @Override
    public Object execute(StreamExecutionEnvironment env, DataSync dataSync, Map<String, Object> params) throws Exception {
        Configuration configuration;
        String pipelineName;
        String from = dataSync.getFrom();
        String to = dataSync.getTo();
        String table = dataSync.getTable();
        if (StringUtils.isBlank((String)from) || StringUtils.isBlank((String)to) || StringUtils.isBlank((String)table)) {
            throw new IllegalArgumentException("The property 'from', 'to' or 'table' cannot be blank.");
        }
        StreamTableEnvironment tableEnv = FlinkJobsContext.getOrCreateStreamTableEnvironment(env);
        String fromTable = FlinkJobsContext.getProperty(FROM_TABLE_PREFIX_KEY) + table;
        String fromConfig = dataSync.getFromConfig();
        StreamTableEnvironmentUtils.useCatalogOrDefault(tableEnv, null);
        TableConfig tableConfig = tableEnv.getConfig();
        if (tableConfig != null && StringUtils.isBlank((String)(pipelineName = (String)(configuration = tableConfig.getConfiguration()).get(PipelineOptions.NAME)))) {
            configuration.set(PipelineOptions.NAME, (Object)("data-sync." + String.join((CharSequence)".", String.join((CharSequence)"-", from, "to", to), table)));
        }
        Map<String, String> fromDataSource = FlinkJobsContext.getDatasource(from);
        Map<String, String> toDataSource = FlinkJobsContext.getDatasource(to);
        String primaryKey = DataSyncOperator.collation(dataSync, fromDataSource, toDataSource, params);
        List<Column> columns = dataSync.getColumns();
        String sql = DataSyncOperator.fromCreateTableSQL(fromDataSource, dataSync.getTopic(), table, fromTable, columns, primaryKey, fromConfig);
        if (log.isInfoEnabled()) {
            log.info("Create source table by Flink SQL: " + SQLUtils.hiddePassword(sql));
            tableEnv.executeSql(sql);
            sql = DataSyncOperator.toCreateTableSQL(toDataSource, table, columns, primaryKey, dataSync.getToConfig());
            log.info("Create sink table by Flink SQL: " + SQLUtils.hiddePassword(sql));
            tableEnv.executeSql(sql);
            sql = DataSyncOperator.insertSQL(table, fromTable, columns, params);
            log.info("Execute Flink SQL: " + SQLUtils.hiddePassword(sql));
        } else {
            tableEnv.executeSql(sql);
            sql = DataSyncOperator.toCreateTableSQL(toDataSource, table, columns, primaryKey, dataSync.getToConfig());
            tableEnv.executeSql(sql);
            sql = DataSyncOperator.insertSQL(table, fromTable, columns, params);
        }
        return tableEnv.executeSql(sql);
    }

    private static String collation(DataSync dataSync, Map<String, String> fromDataSource, Map<String, String> toDataSource, Map<String, Object> params) throws Exception {
        Map<String, String> timestampMap;
        Boolean smart;
        List<Column> columns = dataSync.getColumns();
        if (columns == null) {
            columns = new ArrayList<Column>();
            dataSync.setColumns(columns);
        }
        if ((smart = dataSync.getSmart()) == null) {
            smart = Boolean.valueOf(FlinkJobsContext.getProperty(SMART_KEY));
        }
        String primaryKey = dataSync.getPrimaryKey();
        String timestamp = dataSync.getTimestamp();
        boolean customTimestampBlank = StringUtils.isBlank((String)timestamp);
        if (customTimestampBlank) {
            timestamp = DataSyncOperator.getDefaultTimestamp();
        }
        Map<String, String> map = timestampMap = StringUtils.isBlank((String)timestamp) ? Collections.emptyMap() : DataSyncOperator.toMap(TO_LOWERCASE, timestamp.split(TIMESTAMP_COLUMNS_SPLIT));
        if (Boolean.TRUE.equals(smart)) {
            MetaDataGetter metaDataGetter = MetaDataGetterFactory.getMetaDataGetter(toDataSource);
            MetaDataGetter.TableMetaData tableMetaData = metaDataGetter.getTableMetaData(toDataSource, dataSync.getTable());
            Set<String> primaryKeys = tableMetaData.getPrimaryKeys();
            if (primaryKey == null && primaryKeys != null && !primaryKeys.isEmpty()) {
                primaryKey = String.join((CharSequence)TIMESTAMP_COLUMNS_SPLIT, primaryKeys);
            }
            Map<String, String> columnsMap = tableMetaData.getColumns();
            if (columns.isEmpty()) {
                DataSyncOperator.addSmartLoadColumns(columns, columnsMap, params, timestampMap);
            } else {
                DataSyncOperator.collationPartlyCustom(columns, params, columnsMap, timestampMap);
            }
        } else {
            if (columns.isEmpty()) {
                throw new IllegalArgumentException("At least one column must be configured in manual mode, or set the configuration 'data.sync.smart=true' at " + FlinkJobsContext.getConfigurationFile() + " to enable automatic column acquisition in smart mode");
            }
            DataSyncOperator.collationCustom(columns, params, timestampMap);
        }
        if (!customTimestampBlank) {
            for (String columnName : timestampMap.values()) {
                Column column = new Column();
                column.setFromName(columnName);
                column.setToName(columnName);
                columnName = TO_LOWERCASE ? columnName.toLowerCase() : columnName;
                column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
                column.setToType(DataSyncOperator.getDefaultTimestampToType(columnName));
                columns.add(column);
            }
        }
        return primaryKey;
    }

    private static void collationPartlyCustom(List<Column> columns, Map<String, Object> params, Map<String, String> columnsMap, Map<String, String> timestampMap) {
        int size = columns.size();
        for (int i = 0; i < size; ++i) {
            Column column = columns.get(i);
            String strategy = column.getStrategy();
            if ("from".equals(strategy)) {
                DataSyncOperator.collationPartlyCustomFromStrategy(column, i, params, columnsMap, timestampMap);
            } else if ("to".equals(strategy)) {
                DataSyncOperator.collationPartlyCustomToStratagy(column, i, params, columnsMap, timestampMap);
            } else {
                DataSyncOperator.collationPartlyCustomBothStratagy(column, i, params, columnsMap, timestampMap);
            }
            DataSyncOperator.wrapColumnName(column);
        }
        DataSyncOperator.addSmartLoadColumns(columns, columnsMap, params, timestampMap);
    }

    private static void collationPartlyCustomFromStrategy(Column column, int index, Map<String, Object> params, Map<String, String> columnsMap, Map<String, String> timestampMap) {
        String columnName;
        String fromName = column.getFromName();
        if (StringUtils.isBlank((String)fromName)) {
            throw new IllegalArgumentException("The property 'fromName' cannot be blank, column index: " + index);
        }
        String fromType = column.getFromType();
        String string = columnName = TO_LOWERCASE ? fromName.toLowerCase() : fromName;
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)fromType)) {
                column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
            }
            timestampMap.remove(columnName);
        } else if (StringUtils.isBlank((String)fromType)) {
            throw new IllegalArgumentException("The property 'fromType' cannot be blank, column index: " + index);
        }
        columnsMap.remove(fromName);
    }

    private static void collationPartlyCustomToStratagy(Column column, int index, Map<String, Object> params, Map<String, String> columnsMap, Map<String, String> timestampMap) {
        String columnName;
        String toName = column.getToName();
        if (StringUtils.isBlank((String)toName)) {
            throw new IllegalArgumentException("The property 'toName' cannot be blank, column index: " + index);
        }
        String toType = columnsMap.get(toName);
        String string = columnName = TO_LOWERCASE ? toName.toLowerCase() : toName;
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(toType == null ? DataSyncOperator.getDefaultTimestampToType(columnName) : toType);
            }
            if (StringUtils.isBlank((String)column.getScript())) {
                column.setScript(DataSyncOperator.getDefaultTimestampScript(columnName));
            }
            timestampMap.remove(columnName);
        } else {
            if (toType == null && StringUtils.isBlank((String)column.getToType())) {
                throw new IllegalArgumentException("The property 'toType' cannot be blank, column index: " + index);
            }
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(toType);
            }
        }
        columnsMap.remove(toName);
    }

    private static void collationPartlyCustomBothStratagy(Column column, int index, Map<String, Object> params, Map<String, String> columnsMap, Map<String, String> timestampMap) {
        String fromName = column.getFromName();
        String toName = column.getToName();
        if (StringUtils.isBlank((String)fromName)) {
            if (StringUtils.isBlank((String)toName)) {
                throw new IllegalArgumentException("One of the properties 'fromName' or 'toName' cannot be blank, column index: " + index);
            }
            column.setFromName(toName);
        } else if (StringUtils.isBlank((String)toName)) {
            column.setToName(fromName);
        }
        String columnName = TO_LOWERCASE ? column.getToName().toLowerCase() : column.getToName();
        String toType = columnsMap.get(column.getToName());
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)column.getFromType())) {
                column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
            }
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(toType == null ? DataSyncOperator.getDefaultTimestampToType(columnName) : toType);
            }
            if (StringUtils.isBlank((String)column.getScript())) {
                column.setScript(DataSyncOperator.getDefaultTimestampScript(columnName));
            }
            timestampMap.remove(columnName);
        } else if (toType == null) {
            String fromType = column.getFromType();
            toType = column.getToType();
            if (StringUtils.isBlank((String)fromType)) {
                if (StringUtils.isBlank((String)toType)) {
                    throw new IllegalArgumentException("One of the properties 'fromType' or 'toType' cannot be blank, column index: " + index);
                }
                column.setFromType(toType);
            } else if (StringUtils.isBlank((String)toType)) {
                column.setToType(fromType);
            }
        } else {
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(toType);
            }
            ColumnConvertArgs columnConvertArgs = columnConvertArgsMap.get(DataSyncOperator.getDataType(toType).toUpperCase());
            String fromType = column.getFromType();
            if (columnConvertArgs == null) {
                if (StringUtils.isBlank((String)fromType)) {
                    column.setFromType(toType);
                }
            } else if (StringUtils.isBlank((String)fromType)) {
                column.setFromType(columnConvertArgs.fromType);
                if (StringUtils.isBlank((String)column.getScript())) {
                    column.setScript(columnConvertArgs.script);
                }
            } else if (columnConvertArgs.fromType.equalsIgnoreCase(DataSyncOperator.getDataType(fromType)) && StringUtils.isBlank((String)column.getScript())) {
                column.setScript(columnConvertArgs.script);
            }
            columnsMap.remove(column.getToName());
        }
    }

    private static void collationCustom(List<Column> columns, Map<String, Object> params, Map<String, String> timestampMap) {
        int size = columns.size();
        for (int i = 0; i < size; ++i) {
            Column column = columns.get(i);
            String strategy = column.getStrategy();
            if ("from".equals(strategy)) {
                DataSyncOperator.collationCustomFromStrategy(column, i, params, timestampMap);
            } else if ("to".equals(strategy)) {
                DataSyncOperator.collationCustomToStrategy(column, i, params, timestampMap);
            } else {
                DataSyncOperator.collationCustomBothStrategy(column, i, params, timestampMap);
            }
            DataSyncOperator.wrapColumnName(column);
        }
    }

    private static void collationCustomFromStrategy(Column column, int index, Map<String, Object> params, Map<String, String> timestampMap) {
        String columnName;
        String fromName = column.getFromName();
        if (StringUtils.isBlank((String)fromName)) {
            throw new IllegalArgumentException("The property 'fromName' cannot be blank, column index: " + index);
        }
        String fromType = column.getFromType();
        String string = columnName = TO_LOWERCASE ? fromName.toLowerCase() : fromName;
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)fromType)) {
                column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
            }
            timestampMap.remove(columnName);
        } else if (StringUtils.isBlank((String)fromType)) {
            throw new IllegalArgumentException("The property 'fromType' cannot be blank, column index: " + index);
        }
    }

    private static void collationCustomToStrategy(Column column, int index, Map<String, Object> params, Map<String, String> timestampMap) {
        String columnName;
        String toName = column.getToName();
        if (StringUtils.isBlank((String)toName)) {
            throw new IllegalArgumentException("The property 'toName' cannot be blank, column index: " + index);
        }
        String string = columnName = TO_LOWERCASE ? toName.toLowerCase() : toName;
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(DataSyncOperator.getDefaultTimestampToType(columnName));
            }
            if (StringUtils.isBlank((String)column.getScript())) {
                column.setScript(DataSyncOperator.getDefaultTimestampScript(columnName));
            }
            timestampMap.remove(columnName);
        } else if (StringUtils.isBlank((String)column.getToType())) {
            throw new IllegalArgumentException("The property 'toType' cannot be blank, column index: " + index);
        }
    }

    private static void collationCustomBothStrategy(Column column, int index, Map<String, Object> params, Map<String, String> timestampMap) {
        String columnName;
        String fromName = column.getFromName();
        String toName = column.getToName();
        if (StringUtils.isBlank((String)fromName)) {
            if (StringUtils.isBlank((String)toName)) {
                throw new IllegalArgumentException("One of the properties 'fromName' or 'toName' cannot be blank, column index: " + index);
            }
            column.setFromName(toName);
        } else if (StringUtils.isBlank((String)toName)) {
            column.setToName(fromName);
        }
        String string = columnName = TO_LOWERCASE ? column.getToName().toLowerCase() : column.getToName();
        if (timestampMap.containsKey(columnName)) {
            if (StringUtils.isBlank((String)column.getFromType())) {
                column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
            }
            if (StringUtils.isBlank((String)column.getToType())) {
                column.setToType(DataSyncOperator.getDefaultTimestampToType(columnName));
            }
            if (StringUtils.isBlank((String)column.getScript())) {
                column.setScript(DataSyncOperator.getDefaultTimestampScript(columnName));
            }
            timestampMap.remove(columnName);
        } else {
            String fromType = column.getFromType();
            String toType = column.getToType();
            if (StringUtils.isBlank((String)fromType)) {
                if (StringUtils.isBlank((String)toType)) {
                    throw new IllegalArgumentException("One of the properties 'fromType' or 'toType' cannot be blank, column index: " + index);
                }
                column.setFromType(toType);
            } else if (StringUtils.isBlank((String)toType)) {
                column.setToType(fromType);
            }
            ColumnConvertArgs columnConvertArgs = columnConvertArgsMap.get(DataSyncOperator.getDataType(column.getToType()).toUpperCase());
            if (columnConvertArgs != null && columnConvertArgs.fromType.equalsIgnoreCase(DataSyncOperator.getDataType(column.getFromType()))) {
                column.setFromType(columnConvertArgs.fromType);
                if (StringUtils.isBlank((String)column.getScript())) {
                    column.setScript(columnConvertArgs.script);
                }
            }
        }
    }

    private static void addSmartLoadColumns(List<Column> columns, Map<String, String> columnsMap, Map<String, Object> params, Map<String, String> timestampMap) {
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String columnName;
            String toName = entry.getKey();
            String toType = entry.getValue();
            Column column = new Column();
            column.setToName(toName);
            column.setToType(toType);
            String string = columnName = TO_LOWERCASE ? toName.toLowerCase() : toName;
            if (timestampMap.containsKey(columnName)) {
                String strategy = DataSyncOperator.getDefaultColumnStrategy(columnName);
                column.setStrategy(strategy);
                if (!"to".equals(strategy)) {
                    column.setFromName(toName);
                    column.setFromType(DataSyncOperator.getDefaultTimestampFromType(columnName));
                }
                if (!"from".equals(strategy) && StringUtils.isBlank((String)column.getScript())) {
                    column.setScript(DataSyncOperator.getDefaultTimestampScript(columnName));
                }
                timestampMap.remove(columnName);
            } else {
                column.setFromName(toName);
                ColumnConvertArgs columnConvertArgs = columnConvertArgsMap.get(DataSyncOperator.getDataType(toType).toUpperCase());
                if (columnConvertArgs == null) {
                    column.setFromType(toType);
                } else {
                    column.setFromType(columnConvertArgs.fromType);
                    column.setScript(columnConvertArgs.script);
                }
            }
            DataSyncOperator.wrapColumnName(column);
            columns.add(column);
        }
    }

    private static String getDefaultTimestamp() {
        return FlinkJobsContext.getProperty(TIMESTAMP_COLUMNS);
    }

    private static String getDefaultColumnStrategy(String columnName) {
        return FlinkJobsContext.getProperty(TYPE_KEY_PREFIX + columnName + STRATEGY_KEY_SUFFIX);
    }

    private static String getDefaultTimestampFromType(String columnName) {
        String fromType = FlinkJobsContext.getProperty(TYPE_KEY_PREFIX + columnName + FROM_TYPE_KEY_SUFFIX);
        if (fromType == null) {
            return FlinkJobsContext.getProperty(TIMESTAMP_FROM_TYPE_KEY);
        }
        return fromType;
    }

    private static String getDefaultTimestampToType(String columnName) {
        String toType = FlinkJobsContext.getProperty(TYPE_KEY_PREFIX + columnName + TO_TYPE_KEY_SUFFIX);
        if (toType == null) {
            return FlinkJobsContext.getProperty(TIMESTAMP_TO_TYPE_KEY);
        }
        return toType;
    }

    private static String getDefaultTimestampScript(String columnName) {
        return FlinkJobsContext.getProperty(TYPE_KEY_PREFIX + columnName + SCRIPT_KEY_SUFFIX);
    }

    private static String fromCreateTableSQL(Map<String, String> dataSource, String topic, String table, String fromTable, List<Column> columns, String primaryKey, String fromConfig) throws IOException {
        Column column;
        StringBuffer sqlBuffer = new StringBuffer();
        sqlBuffer.append("CREATE TABLE ").append(fromTable).append("(");
        int i = 0;
        int size = columns.size();
        while (i < size) {
            if ("to".equals((column = columns.get(i++)).getStrategy())) continue;
            sqlBuffer.append(column.getFromName()).append(' ').append(column.getFromType());
            break;
        }
        while (i < size) {
            if ("to".equals((column = columns.get(i++)).getStrategy())) continue;
            sqlBuffer.append(',').append(' ').append(column.getFromName()).append(' ').append(column.getFromType());
        }
        if (StringUtils.isNotBlank((String)primaryKey)) {
            sqlBuffer.append(',').append(' ').append("PRIMARY KEY (").append(primaryKey).append(") NOT ENFORCED");
        }
        sqlBuffer.append(") ").append("WITH (");
        HashMap<String, String> actualDataSource = MapUtils.newHashMap(dataSource);
        if (StringUtils.isBlank((String)fromConfig)) {
            if (ConfigurationUtils.isKafka(actualDataSource)) {
                actualDataSource.put(GROUP_ID_KEY, FlinkJobsContext.getProperty(GROUP_ID_PREFIX_KEY) + table);
            }
            if (topic != null) {
                actualDataSource.put(TOPIC_KEY, topic);
            }
            SQLUtils.appendDataSource(sqlBuffer, actualDataSource);
        } else {
            Map<String, String> config = ConfigurationUtils.load(fromConfig);
            MapUtils.removeAll(actualDataSource, config.keySet());
            if (!config.containsKey(GROUP_ID_KEY) && ConfigurationUtils.isKafka(actualDataSource)) {
                actualDataSource.put(GROUP_ID_KEY, FlinkJobsContext.getProperty(GROUP_ID_PREFIX_KEY) + table);
            }
            if (topic != null && !config.containsKey(TOPIC_KEY)) {
                actualDataSource.put(TOPIC_KEY, topic);
            }
            SQLUtils.appendDataSource(sqlBuffer, actualDataSource);
            sqlBuffer.append(',').append(' ').append(fromConfig);
        }
        sqlBuffer.append(")");
        return sqlBuffer.toString();
    }

    private static String toCreateTableSQL(Map<String, String> dataSource, String table, List<Column> columns, String primaryKey, String toConfig) throws IOException {
        String toName;
        Column column;
        StringBuffer sqlBuffer = new StringBuffer();
        sqlBuffer.append("CREATE TABLE ").append(table).append("(");
        int i = 0;
        int size = columns.size();
        while (i < size) {
            if ("from".equals((column = columns.get(i++)).getStrategy())) continue;
            toName = column.getToName();
            sqlBuffer.append(toName == null ? column.getFromName() : toName).append(' ').append(column.getToType());
            break;
        }
        while (i < size) {
            if ("from".equals((column = columns.get(i++)).getStrategy())) continue;
            toName = column.getToName();
            sqlBuffer.append(',').append(' ').append(toName == null ? column.getFromName() : toName).append(' ').append(column.getToType());
        }
        if (StringUtils.isNotBlank((String)primaryKey)) {
            sqlBuffer.append(',').append(' ').append("PRIMARY KEY (").append(primaryKey).append(") NOT ENFORCED");
        }
        sqlBuffer.append(") ").append("WITH (");
        HashMap<String, String> actualDataSource = MapUtils.newHashMap(dataSource);
        actualDataSource.put("table-name", table);
        if (StringUtils.isBlank((String)toConfig)) {
            SQLUtils.appendDataSource(sqlBuffer, actualDataSource);
        } else {
            Map<String, String> config = ConfigurationUtils.load(toConfig);
            MapUtils.removeAll(actualDataSource, config.keySet());
            SQLUtils.appendDataSource(sqlBuffer, actualDataSource);
            sqlBuffer.append(',').append(' ').append(toConfig);
        }
        sqlBuffer.append(")");
        return sqlBuffer.toString();
    }

    private static String insertSQL(String table, String fromTable, List<Column> columns, Map<String, Object> params) {
        Column column;
        StringBuffer sqlBuffer = new StringBuffer();
        sqlBuffer.append("INSERT INTO ").append(table).append(' ').append("(");
        boolean needComma = false;
        int size = columns.size();
        for (int i = 0; i < size; ++i) {
            column = columns.get(i);
            String toName = column.getToName();
            if ("from".equals(column.getStrategy())) continue;
            if (needComma) {
                sqlBuffer.append(',');
            } else {
                needComma = true;
            }
            sqlBuffer.append(' ').append(toName == null ? column.getFromName() : toName);
        }
        sqlBuffer.append(") SELECT ");
        needComma = false;
        int size2 = columns.size();
        for (int i = 0; i < size2; ++i) {
            column = columns.get(i);
            String script = column.getScript();
            if ("from".equals(column.getStrategy())) continue;
            if (needComma) {
                sqlBuffer.append(',');
            } else {
                needComma = true;
            }
            sqlBuffer.append(' ').append(StringUtils.isBlank((String)script) ? column.getFromName() : DataSyncOperator.toScript(script, column.getFromName(), params));
        }
        sqlBuffer.append(" FROM ").append(fromTable);
        return sqlBuffer.toString();
    }

    private static String toScript(String dsl, String columnName, Map<String, Object> params) {
        NamedScript namedScript = DSLUtils.parse((String)dsl, ParamsKit.init(params).put(COLUMN_NAME, columnName).get());
        return DSLUtils.toScript((String)namedScript.getScript(), (Map)namedScript.getParams(), (ParamsParser)FlinkSQLParamsParser.getInstance()).getValue();
    }

    public static final Map<String, String> toMap(boolean toLowercase, String ... strings) {
        HashMap<String, String> map = new HashMap<String, String>();
        if (toLowercase) {
            for (int i = 0; i < strings.length; ++i) {
                String string = strings[i].trim();
                map.put(string.toLowerCase(), string);
            }
        } else {
            for (int i = 0; i < strings.length; ++i) {
                String string = strings[i].trim();
                map.put(string, string);
            }
        }
        return map;
    }

    private static String getDataType(String type) {
        return type.split("\\s", 2)[0];
    }

    private static void wrapColumnName(Column column) {
        column.setFromName(SQLUtils.wrapIfReservedKeywords(column.getFromName()));
        column.setToName(SQLUtils.wrapIfReservedKeywords(column.getToName()));
    }

    static {
        String convert = FlinkJobsContext.getProperty("data.sync.columns.convert");
        if (convert != null) {
            String[] argsArr = convert.split(";");
            String fromType = null;
            StringBuilder typeBuilder = new StringBuilder();
            for (int i = 0; i < argsArr.length; ++i) {
                String script;
                String toType;
                String argsStr = argsArr[i];
                int j = 0;
                int len = argsStr.length();
                boolean sameType = false;
                while (j < len) {
                    char c;
                    if ((c = argsStr.charAt(j++)) == ',') {
                        fromType = typeBuilder.toString().trim();
                        break;
                    }
                    if (c == ':') {
                        sameType = true;
                        break;
                    }
                    typeBuilder.append(c);
                }
                typeBuilder.setLength(0);
                if (sameType) {
                    toType = fromType;
                    script = argsStr.substring(j);
                    if (StringUtils.isBlank((String)script)) {
                        throw new IllegalConfigurationException("Each item of the configuration for the key 'data.sync.columns.convert' must be in the form of '{type}:{script}' or '{fromtype},{totype}:{script}'");
                    }
                } else {
                    String[] args = argsStr.substring(j).split(":", 2);
                    if (args.length < 2) {
                        throw new IllegalConfigurationException("Each item of the configuration for the key 'data.sync.columns.convert' must be in the form of '{type}:{script}' or '{fromtype},{totype}:{script}'");
                    }
                    toType = args[0];
                    script = args[1];
                }
                columnConvertArgsMap.put(toType.toUpperCase(), new ColumnConvertArgs(fromType, script));
            }
        }
    }

    private static class ColumnConvertArgs {
        private String fromType;
        private String script;

        public ColumnConvertArgs(String fromType, String script) {
            this.fromType = fromType;
            this.script = script;
        }
    }
}

