/*
 * Decompiled with CFR 0.152.
 */
package org.zstacks.zbus.server.mq.store;

import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.MqInfo;
import org.zstacks.zbus.server.mq.MessageQueue;
import org.zstacks.zbus.server.mq.RequestQueue;
import org.zstacks.zbus.server.mq.store.MessageStore;
import org.zstacks.znet.Message;
import org.zstacks.znet.MessageAdaptor;
import org.zstacks.znet.nio.IoBuffer;

public class MessageStoreSql
implements MessageStore {
    private static final Logger log = LoggerFactory.getLogger(MessageStoreSql.class);
    private static final MessageAdaptor codec = new MessageAdaptor();
    private static final String CONFIG_FILE = "sql.properties";
    private final Properties props = new Properties();
    private Connection connection;
    private String driver = "org.hsqldb.jdbcDriver";
    private String url = "jdbc:hsqldb:db/zbus";
    private String user = "sa";
    private String password = "";
    private String sqlMsgs = "CREATE TABLE IF NOT EXISTS msgs(id VARCHAR(128), msg_str VARCHAR(10240000), PRIMARY KEY(id) )";
    private String sqlMqMsgs = "CREATE TABLE IF NOT EXISTS mq_msgs(mq_id VARCHAR(128), msg_id VARCHAR(128) )";
    private String sqlMqs = "CREATE TABLE IF NOT EXISTS mqs(id VARCHAR(512), mq_info VARCHAR(10240000), PRIMARY KEY(id) )";
    private final String brokerKey;

    public MessageStoreSql(String broker) throws Exception {
        this.brokerKey = broker;
        InputStream stream = this.getClass().getClassLoader().getResourceAsStream(CONFIG_FILE);
        try {
            if (stream != null) {
                this.props.load(stream);
            } else {
                log.warn("missing properties: sql.properties");
            }
        }
        catch (IOException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    private Connection getConnection() {
        try {
            this.url = this.props.getProperty("url", this.url).trim();
            this.user = this.props.getProperty("sa", this.user).trim();
            this.password = this.props.getProperty("password", this.password).trim();
            return DriverManager.getConnection(this.url, this.user, this.password);
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            return null;
        }
    }

    @Override
    public void start() throws Exception {
        this.driver = this.props.getProperty("driver", this.driver).trim();
        Class.forName(this.driver);
        this.connection = this.getConnection();
        this.initDbTable();
    }

    @Override
    public void shutdown() throws Exception {
        if (this.connection == null) {
            return;
        }
        try {
            Statement st = this.connection.createStatement();
            st.execute("SHUTDOWN");
            this.connection.close();
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    private String msgKey(Message msg) {
        return msg.getMsgId();
    }

    private String mqKey(String mq) {
        return String.format("%s-%s", this.brokerKey, mq);
    }

    @Override
    public void saveMessage(Message msg) {
        try {
            String msgId = this.msgKey(msg);
            String mqId = this.mqKey(msg.getMq());
            this.update("INSERT INTO msgs(id, msg_str) VALUES(?,?)", msgId, msg.toString());
            this.update("INSERT INTO mq_msgs(mq_id, msg_id) VALUES(?,?)", mqId, msgId);
            if (log.isDebugEnabled()) {
                log.debug("save " + msgId);
            }
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void removeMessage(Message msg) {
        try {
            String msgId = this.msgKey(msg);
            this.update("DELETE FROM msgs WHERE id=?", msgId);
            this.update("DELETE FROM mq_msgs WHERE msg_id=?", msgId);
            if (log.isDebugEnabled()) {
                log.debug("delete " + msgId);
            }
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void onMessageQueueCreated(MessageQueue mq) {
        try {
            String mqId = this.mqKey(mq.getName());
            String json = JSON.toJSONString((Object)mq.getMqInfo());
            this.update("INSERT INTO mqs(id, mq_info) VALUES(?,?)", mqId, json);
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public void onMessageQueueRemoved(MessageQueue mq) {
        try {
            String mqId = this.mqKey(mq.getName());
            this.update("DELETE FROM mqs WHERE id=?", mqId);
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public ConcurrentMap<String, MessageQueue> loadMqTable() throws SQLException {
        ConcurrentHashMap<String, MessageQueue> res = new ConcurrentHashMap<String, MessageQueue>();
        ResultSet mqRs = this.query("SELECT * FROM mqs");
        while (mqRs.next()) {
            String mqId = mqRs.getString("id");
            String mqName = mqId.substring(mqId.indexOf(45) + 1);
            String mqInfoString = mqRs.getString("mq_info");
            MqInfo info = (MqInfo)JSON.parseObject((String)mqInfoString, MqInfo.class);
            int mode = info.getMode();
            if (!MessageMode.isEnabled(mode, MessageMode.MQ)) {
                log.warn("message queue mode not support");
                continue;
            }
            RequestQueue mq = new RequestQueue(info.getBroker(), mqName, null, mode);
            mq.setCreator(info.getCreator());
            mq.setMessageStore(this);
            String sql = String.format("SELECT msg_str FROM mq_msgs, msgs WHERE mq_msgs.msg_id = msgs.id AND mq_msgs.mq_id='%s'", mqId);
            ResultSet msgRs = this.query(sql);
            ArrayList<Message> msgs = new ArrayList<Message>();
            while (msgRs.next()) {
                String msgString = msgRs.getString("msg_str");
                IoBuffer buf = IoBuffer.wrap((String)msgString);
                Message msg = (Message)codec.decode(buf);
                if (msg != null) {
                    msgs.add(msg);
                    continue;
                }
                log.error("message decode error");
            }
            msgRs.close();
            mq.loadMessageList(msgs);
            res.put(mqName, mq);
        }
        mqRs.close();
        return res;
    }

    private void initDbTable() throws SQLException {
        this.sqlMsgs = this.props.getProperty("sql_msgs", this.sqlMsgs).trim();
        this.sqlMqMsgs = this.props.getProperty("sql_mq_msgs", this.sqlMqMsgs).trim();
        this.sqlMqs = this.props.getProperty("sql_mqs", this.sqlMqs).trim();
        this.update(this.sqlMsgs, new Object[0]);
        this.update(this.sqlMqMsgs, new Object[0]);
        this.update(this.sqlMqs, new Object[0]);
    }

    private synchronized ResultSet query(String sql) throws SQLException {
        if (this.connection == null) {
            return null;
        }
        Statement st = this.connection.createStatement();
        return st.executeQuery(sql);
    }

    private synchronized void update(String sql, Object ... args) throws SQLException {
        int i;
        if (this.connection == null) {
            return;
        }
        PreparedStatement st = this.connection.prepareStatement(sql);
        for (i = 0; i < args.length; ++i) {
            st.setObject(i + 1, args[i]);
        }
        i = st.executeUpdate();
        if (i == -1) {
            log.error("db error : " + sql);
        }
        st.close();
    }
}

