/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.eventbus.jdbc;

import com.google.common.base.Throwables;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.List;
import me.ahoo.eventbus.core.repository.ConcurrentVersionConflictException;
import me.ahoo.eventbus.core.repository.PublishEventRepository;
import me.ahoo.eventbus.core.repository.PublishIdentity;
import me.ahoo.eventbus.core.repository.PublishStatus;
import me.ahoo.eventbus.core.repository.Version;
import me.ahoo.eventbus.core.repository.entity.PublishEventCompensationEntity;
import me.ahoo.eventbus.core.repository.entity.PublishEventEntity;
import me.ahoo.eventbus.core.utils.Dates;
import me.ahoo.eventbus.core.utils.Jsons;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;

public class JdbcPublishEventRepository
implements PublishEventRepository {
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final String SQL_INITIALIZED = "insert publish_event (event_name, event_data, status,version) values (:event_name, :event_data, :status,:version);";
    private static final String SQL_MARK_STATUS = "update publish_event set status=:status,version=version+1 where id=:id and version=:version;";
    private static final String SQL_PUBLISH_FAILED = "insert publish_event_failed(publish_event_id, failed_msg) values (:publish_event_id, :failed_msg)";
    private static final String SQL_QUERY_FAILED = "select id, event_name, event_data, status, published_time, version, create_time from publish_event where status<>1 and create_time<:before and version<:max_version order by version asc limit :limit;";
    private static String SQL_COMPENSATE = "insert publish_event_compensation (publish_event_id, start_time, taken, failed_msg) values (:publish_event_id, :start_time, :taken, :failed_msg);";

    public JdbcPublishEventRepository(NamedParameterJdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public PublishIdentity initialize(String eventName, Object eventData) {
        PublishIdentity subscribeIdentity = PublishIdentity.builder().status(PublishStatus.INITIALIZED).version(Version.INITIAL_VALUE).eventName(eventName).build();
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("event_name", (Object)eventName);
        String eventDataStr = Jsons.serializeAsString((Object)eventData);
        sqlParams.addValue("event_data", (Object)eventDataStr);
        sqlParams.addValue("status", (Object)subscribeIdentity.getStatus().getValue());
        sqlParams.addValue("version", (Object)subscribeIdentity.getVersion());
        GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
        this.jdbcTemplate.update(SQL_INITIALIZED, (SqlParameterSource)sqlParams, (KeyHolder)keyHolder);
        subscribeIdentity.setId(Long.valueOf(keyHolder.getKey().longValue()));
        return subscribeIdentity;
    }

    public int markStatus(PublishIdentity publishIdentity) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("id", (Object)publishIdentity.getId());
        sqlParams.addValue("status", (Object)publishIdentity.getStatus().getValue());
        sqlParams.addValue("version", (Object)publishIdentity.getVersion());
        int affected = this.jdbcTemplate.update(SQL_MARK_STATUS, (SqlParameterSource)sqlParams);
        if (affected == 0) {
            String errMsg = String.format("Publish [%s] mark [%d]@[%d] to status [%s] error.", publishIdentity.getEventName(), publishIdentity.getId(), publishIdentity.getVersion(), publishIdentity.getStatus().name());
            throw new ConcurrentVersionConflictException(errMsg, (Version)publishIdentity);
        }
        return affected;
    }

    public int markSucceeded(PublishIdentity publishIdentity) {
        publishIdentity.setStatus(PublishStatus.SUCCEEDED);
        return this.markStatus(publishIdentity);
    }

    public int markFailed(PublishIdentity publishIdentity, Throwable throwable) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("publish_event_id", (Object)publishIdentity.getId());
        String failedMsg = Throwables.getStackTraceAsString((Throwable)throwable);
        sqlParams.addValue("failed_msg", (Object)failedMsg);
        this.jdbcTemplate.update(SQL_PUBLISH_FAILED, (SqlParameterSource)sqlParams);
        publishIdentity.setStatus(PublishStatus.FAILED);
        return this.markStatus(publishIdentity);
    }

    public List<PublishEventEntity> queryFailed(int limit, int before, int maxVersion) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("max_version", (Object)maxVersion);
        LocalDateTime beforeDate = LocalDateTime.now().minus(before, ChronoUnit.MINUTES);
        sqlParams.addValue("before", (Object)beforeDate);
        sqlParams.addValue("limit", (Object)limit);
        return this.jdbcTemplate.query(SQL_QUERY_FAILED, (SqlParameterSource)sqlParams, (rs, rowNum) -> {
            long id = rs.getLong("id");
            String eventName = rs.getString("event_name");
            String eventDataStr = rs.getString("event_data");
            int status = rs.getInt("status");
            Timestamp publishedTime = rs.getTimestamp("published_time");
            int version = rs.getInt("version");
            Timestamp createTime = rs.getTimestamp("create_time");
            PublishEventEntity entity = new PublishEventEntity();
            entity.setId(Long.valueOf(id));
            entity.setEventName(eventName);
            entity.setEventData((Object)eventDataStr);
            entity.setStatus(PublishStatus.valueOf((int)status));
            entity.setVersion(Integer.valueOf(version));
            entity.setPublishedTime(Dates.of((Date)publishedTime));
            entity.setCreateTime(Dates.of((Date)createTime));
            return entity;
        });
    }

    public int compensate(PublishEventCompensationEntity publishEventCompensationEntity) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("publish_event_id", (Object)publishEventCompensationEntity.getPublishEventId());
        sqlParams.addValue("start_time", (Object)publishEventCompensationEntity.getStartTime());
        sqlParams.addValue("taken", (Object)publishEventCompensationEntity.getTaken());
        sqlParams.addValue("failed_msg", (Object)publishEventCompensationEntity.getFailedMsg());
        return this.jdbcTemplate.update(SQL_COMPENSATE, (SqlParameterSource)sqlParams);
    }
}

