/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.operator;

import cn.tenmg.dsl.utils.StringUtils;
import cn.tenmg.flink.jobs.context.FlinkJobsContext;
import cn.tenmg.flink.jobs.metadata.MetaDataGetter;
import cn.tenmg.flink.jobs.metadata.MetaDataGetterFactory;
import cn.tenmg.flink.jobs.model.CreateTable;
import cn.tenmg.flink.jobs.model.create.table.Column;
import cn.tenmg.flink.jobs.operator.AbstractOperator;
import cn.tenmg.flink.jobs.utils.DataSourceFilterUtils;
import cn.tenmg.flink.jobs.utils.SQLUtils;
import cn.tenmg.flink.jobs.utils.StreamTableEnvironmentUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CreateTableOperator
extends AbstractOperator<CreateTable> {
    private static Logger log = LoggerFactory.getLogger(CreateTableOperator.class);
    private static final boolean SMART_DEFAULT = Boolean.valueOf(FlinkJobsContext.getProperty("flink.jobs.smart"));

    @Override
    public Object execute(StreamExecutionEnvironment env, CreateTable createTable, Map<String, Object> params) throws Exception {
        String datasource = createTable.getDataSource();
        String tableName = createTable.getTableName();
        if (StringUtils.isBlank((String)datasource) || StringUtils.isBlank((String)tableName)) {
            throw new IllegalArgumentException("The property 'dataSource' or 'tableName' cannot be blank.");
        }
        StreamTableEnvironment tableEnv = FlinkJobsContext.getOrCreateStreamTableEnvironment(env);
        StreamTableEnvironmentUtils.useCatalogOrDefault(tableEnv, createTable.getCatalog());
        Map<String, String> dataSource = DataSourceFilterUtils.filter(createTable.getDataSourceFilter(), FlinkJobsContext.getDatasource(datasource));
        String primaryKey = CreateTableOperator.collation(createTable, dataSource);
        String sql = CreateTableOperator.createTableSQL(dataSource, tableName, createTable.getBindTableName(), createTable.getColumns(), primaryKey);
        if (log.isInfoEnabled()) {
            log.info("Create table by Flink SQL: " + SQLUtils.hiddePassword(sql));
        }
        return tableEnv.executeSql(sql);
    }

    private static String collation(CreateTable createTable, Map<String, String> dataSource) throws Exception {
        Boolean smart;
        List<Column> columns = createTable.getColumns();
        if (columns == null) {
            columns = new ArrayList<Column>();
            createTable.setColumns(columns);
        }
        if ((smart = createTable.getSmart()) == null) {
            smart = SMART_DEFAULT;
        }
        String primaryKey = createTable.getPrimaryKey();
        if (Boolean.TRUE.equals(smart)) {
            MetaDataGetter metaDataGetter = MetaDataGetterFactory.getMetaDataGetter(dataSource);
            MetaDataGetter.TableMetaData tableMetaData = metaDataGetter.getTableMetaData(dataSource, createTable.getTableName());
            Set<String> primaryKeys = tableMetaData.getPrimaryKeys();
            if (primaryKey == null && primaryKeys != null && !primaryKeys.isEmpty()) {
                primaryKey = String.join((CharSequence)",", primaryKeys);
            }
            if (!columns.isEmpty()) {
                CreateTableOperator.collationCustom(columns);
            }
            CreateTableOperator.addSmartLoadColumns(columns, tableMetaData.getColumns());
        } else {
            if (columns.isEmpty()) {
                throw new IllegalArgumentException("At least one column must be configured in manual mode, or set the configuration 'flink.jobs.smart=true' to enable automatic column acquisition in smart mode");
            }
            CreateTableOperator.collationCustom(columns);
        }
        return primaryKey;
    }

    private static void addSmartLoadColumns(List<Column> columns, Map<String, String> columnsMap) {
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            Column column = new Column();
            column.setName(SQLUtils.wrapIfReservedKeywords(entry.getKey()));
            column.setType(entry.getValue());
            columns.add(column);
        }
    }

    private static void collationCustom(List<Column> columns) {
        int size = columns.size();
        for (int i = 0; i < size; ++i) {
            Column column = columns.get(i);
            column.setName(SQLUtils.wrapIfReservedKeywords(column.getName()));
        }
    }

    private static String createTableSQL(Map<String, String> dataSource, String tableName, String bindTableName, List<Column> columns, String primaryKey) throws IOException {
        StringBuffer sqlBuffer = new StringBuffer();
        sqlBuffer.append("CREATE TABLE ").append(SQLUtils.wrapIfReservedKeywords(tableName)).append("(");
        Column column = columns.get(0);
        sqlBuffer.append(column.getName()).append(' ').append(column.getType());
        int size = columns.size();
        for (int i = 1; i < size; ++i) {
            column = columns.get(i);
            sqlBuffer.append(',').append(' ').append(column.getName()).append(' ').append(column.getType());
        }
        if (StringUtils.isNotBlank((String)primaryKey)) {
            sqlBuffer.append(',').append(' ').append("PRIMARY KEY (").append(primaryKey).append(") NOT ENFORCED");
        }
        sqlBuffer.append(") ").append("WITH (");
        SQLUtils.appendDataSource(sqlBuffer, dataSource, StringUtils.isBlank((String)bindTableName) ? tableName : bindTableName);
        sqlBuffer.append(")");
        return sqlBuffer.toString();
    }
}

