/*
 * Decompiled with CFR 0.152.
 */
package cn.mzhong.janytask.jdbc.mapper;

import cn.mzhong.janytask.core.TaskContext;
import cn.mzhong.janytask.jdbc.BytesMessage;
import cn.mzhong.janytask.jdbc.DataSourceHelper;
import cn.mzhong.janytask.jdbc.mapper.MessageMapper;
import cn.mzhong.janytask.tool.PRInvoker;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.LinkedList;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AbstractMessageMapper
implements MessageMapper {
    protected TaskContext context;
    protected DataSourceHelper sqlExecutor;
    protected String table;

    public AbstractMessageMapper(TaskContext context, DataSourceHelper sqlExecutor, String table) {
        this.context = context;
        this.sqlExecutor = sqlExecutor;
        this.table = table;
    }

    @Override
    public void init() {
        if (!this.isTableExists()) {
            this.createTable();
        }
    }

    @Override
    public void save(BytesMessage message) {
        this.sqlExecutor.update("INSERT INTO " + this.table + "(MESSAGE_ID,QUEUE_ID,PUSH_TIME,CONTENT,STATUS) VALUES (?,?,?,?,?)", message.getId(), message.getQueueId(), new Timestamp(message.getPushTime().getTime()), message.getContentBytes(), "W");
    }

    @Override
    public LinkedList<String> keys() {
        return this.sqlExecutor.queryList("SELECT MESSAGE_ID FROM " + this.table + " WHERE STATUS=?", new Object[]{"W"}, new PRInvoker<ResultSet, String>(){

            @Override
            public String invoke(ResultSet resultSet) throws Exception {
                return resultSet.getString(1);
            }
        });
    }

    @Override
    public boolean lock(String key) {
        return 1 == this.sqlExecutor.update("UPDATE " + this.table + " SET STATUS=? WHERE MESSAGE_ID=? AND STATUS=?", "L", key, "W");
    }

    @Override
    public boolean unLock(String key) {
        return 1 == this.sqlExecutor.update("UPDATE " + this.table + " SET STATUS=? WHERE MESSAGE_ID=? AND STATUS=?", "W", key, "L");
    }

    @Override
    public BytesMessage get(final String key) {
        return this.sqlExecutor.query("SELECT CONTENT FROM " + this.table + " WHERE MESSAGE_ID=?", new Object[]{key}, new PRInvoker<ResultSet, BytesMessage>(){

            @Override
            public BytesMessage invoke(ResultSet resultSet) throws Exception {
                BytesMessage message = new BytesMessage();
                message.setId(key);
                message.setContentBytes(resultSet.getBytes(1));
                return message;
            }
        });
    }

    @Override
    public void done(BytesMessage message) {
        this.sqlExecutor.update("UPDATE FROM " + this.table + " SET STATUS=?, DONE_TIME=? WHERE MESSAGE_ID=?", "D", new Timestamp(message.getDoneTime().getTime()), message.getId());
    }

    @Override
    public void error(BytesMessage message) {
        this.sqlExecutor.update("UPDATE " + this.table + " SET STATUS=?, ERROR_TIME=?, THROWABLE=? WHERE MESSAGE_ID=?", "E", new Timestamp(message.getErrorTime().getTime()), message.getThrowableBytes(), message.getId());
    }

    @Override
    public long length(String lineID) {
        return this.sqlExecutor.queryLong("SELECT COUNT(*) FROM " + this.table + " WHERE QUEUE_ID=? AND STATUS=?", new Object[]{lineID, "W"});
    }
}

