/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.eventbus.jdbc;

import com.google.common.base.Throwables;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import me.ahoo.eventbus.core.publisher.PublishEventWrapper;
import me.ahoo.eventbus.core.repository.ConcurrentVersionConflictException;
import me.ahoo.eventbus.core.repository.RepeatedSubscribeException;
import me.ahoo.eventbus.core.repository.SubscribeEventRepository;
import me.ahoo.eventbus.core.repository.SubscribeIdentity;
import me.ahoo.eventbus.core.repository.SubscribeStatus;
import me.ahoo.eventbus.core.repository.Version;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventCompensationEntity;
import me.ahoo.eventbus.core.repository.entity.SubscribeEventEntity;
import me.ahoo.eventbus.core.subscriber.Subscriber;
import me.ahoo.eventbus.core.utils.Dates;
import me.ahoo.eventbus.core.utils.Jsons;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;

public class JdbcSubscribeEventRepository
implements SubscribeEventRepository {
    private final NamedParameterJdbcTemplate jdbcTemplate;
    private static final String SQL_GET_SUBSCRIBE_EVENT = "select id,status,version from subscribe_event where event_id=:event_id and event_name=:event_name and subscribe_name=:subscribe_name limit 1;";
    private static final String SQL_SUBSCRIBE_INITIALIZED = "insert subscribe_event(subscribe_name, status, event_id, event_name, event_data, event_create_time, version)values (:subscribe_name, :status, :event_id, :event_name, :event_data, :event_create_time, :version);";
    private static final String SQL_MARK_STATUS = "update subscribe_event set status=:status,version=version+1 where id=:id and version=:version;";
    private static final String SQL_SUBSCRIBE_FAILED = "insert subscribe_event_failed (subscribe_event_id, failed_msg) values (:subscribe_event_id, :failed_msg)";
    private static final String SQL_QUERY_FAILED = "select id, subscribe_name, status, subscribe_time, event_id, event_name, event_data, event_create_time, version, create_time from subscribe_event where status<>1 and create_time<:before and version<:max_version order by version asc limit :limit;";
    private static String SQL_COMPENSATE = "insert subscribe_event_compensation (subscribe_event_id, start_time, taken, failed_msg) values (:subscribe_event_id, :start_time, :taken, :failed_msg) ";

    public JdbcSubscribeEventRepository(NamedParameterJdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    private Optional<SubscribeIdentity> getSubscribeIdentity(Subscriber subscriber, Long eventId, String eventName) {
        MapSqlParameterSource getSubscribeEventParams = new MapSqlParameterSource("event_id", (Object)eventId);
        getSubscribeEventParams.addValue("event_name", (Object)eventName);
        getSubscribeEventParams.addValue("subscribe_name", (Object)subscriber.getName());
        try {
            SubscribeIdentity subscribeIdentity = (SubscribeIdentity)this.jdbcTemplate.queryForObject(SQL_GET_SUBSCRIBE_EVENT, (SqlParameterSource)getSubscribeEventParams, (rs, rowNum) -> {
                long id = rs.getLong("id");
                int status = rs.getInt("status");
                int version = rs.getInt("version");
                return SubscribeIdentity.builder().id(Long.valueOf(id)).subscriberName(subscriber.getName()).status(SubscribeStatus.valeOf((int)status)).version(Integer.valueOf(version)).build();
            });
            return Optional.of(subscribeIdentity);
        }
        catch (EmptyResultDataAccessException ex) {
            return Optional.empty();
        }
    }

    private SubscribeIdentity initializeSubscribeIdentity(Subscriber subscriber, PublishEventWrapper subscribePublishEventWrapper, String eventName) {
        SubscribeIdentity subscribeIdentity = SubscribeIdentity.builder().status(SubscribeStatus.INITIALIZED).version(Version.INITIAL_VALUE).subscriberName(subscriber.getName()).build();
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("subscribe_name", (Object)subscriber.getName());
        sqlParams.addValue("status", (Object)subscribeIdentity.getStatus().getValue());
        sqlParams.addValue("event_id", (Object)subscribePublishEventWrapper.getId());
        sqlParams.addValue("event_name", (Object)eventName);
        String eventData = Jsons.serializeAsString((Object)subscribePublishEventWrapper.getEventData());
        sqlParams.addValue("event_data", (Object)eventData);
        sqlParams.addValue("event_create_time", (Object)subscribePublishEventWrapper.getCreateTime());
        sqlParams.addValue("version", (Object)subscribeIdentity.getVersion());
        GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
        this.jdbcTemplate.update(SQL_SUBSCRIBE_INITIALIZED, (SqlParameterSource)sqlParams, (KeyHolder)keyHolder);
        subscribeIdentity.setId(Long.valueOf(keyHolder.getKey().longValue()));
        return subscribeIdentity;
    }

    public SubscribeIdentity initialize(Subscriber subscriber, PublishEventWrapper subscribePublishEventWrapper) throws RepeatedSubscribeException {
        String eventName = subscribePublishEventWrapper.getEventName();
        Optional<SubscribeIdentity> subscribeIdentity = this.getSubscribeIdentity(subscriber, subscribePublishEventWrapper.getId(), eventName);
        if (subscribeIdentity.isPresent()) {
            if (subscribeIdentity.get().getStatus().equals((Object)SubscribeStatus.SUCCEEDED)) {
                throw new RepeatedSubscribeException(subscriber, subscribePublishEventWrapper);
            }
            return subscribeIdentity.get();
        }
        return this.initializeSubscribeIdentity(subscriber, subscribePublishEventWrapper, eventName);
    }

    public int markStatus(SubscribeIdentity subscribeIdentity) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("id", (Object)subscribeIdentity.getId());
        sqlParams.addValue("status", (Object)subscribeIdentity.getStatus().getValue());
        sqlParams.addValue("version", (Object)subscribeIdentity.getVersion());
        int affected = this.jdbcTemplate.update(SQL_MARK_STATUS, (SqlParameterSource)sqlParams);
        if (affected == 0) {
            String errMsg = String.format("Subscribe [%s] mark [%d]@[%d] to status [%s] error.", subscribeIdentity.getSubscriberName(), subscribeIdentity.getId(), subscribeIdentity.getVersion(), subscribeIdentity.getStatus().name());
            throw new ConcurrentVersionConflictException(errMsg, (Version)subscribeIdentity);
        }
        return affected;
    }

    public int markSucceeded(SubscribeIdentity subscribeIdentity) {
        subscribeIdentity.setStatus(SubscribeStatus.SUCCEEDED);
        return this.markStatus(subscribeIdentity);
    }

    public int markFailed(SubscribeIdentity subscribeIdentity, Throwable throwable) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("subscribe_event_id", (Object)subscribeIdentity.getId());
        String failedMsg = Throwables.getStackTraceAsString((Throwable)throwable);
        sqlParams.addValue("failed_msg", (Object)failedMsg);
        this.jdbcTemplate.update(SQL_SUBSCRIBE_FAILED, (SqlParameterSource)sqlParams);
        subscribeIdentity.setStatus(SubscribeStatus.FAILED);
        return this.markStatus(subscribeIdentity);
    }

    public List<SubscribeEventEntity> queryFailed(int limit, int before, int maxVersion) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("max_version", (Object)maxVersion);
        LocalDateTime beforeDate = LocalDateTime.now().minus(before, ChronoUnit.MINUTES);
        sqlParams.addValue("before", (Object)beforeDate);
        sqlParams.addValue("limit", (Object)limit);
        return this.jdbcTemplate.query(SQL_QUERY_FAILED, (SqlParameterSource)sqlParams, (rs, rowNum) -> {
            long id = rs.getLong("id");
            String subscribeName = rs.getString("subscribe_name");
            int status = rs.getInt("status");
            Timestamp subscribeTime = rs.getTimestamp("subscribe_time");
            long eventId = rs.getLong("event_id");
            String eventName = rs.getString("event_name");
            String eventDataStr = rs.getString("event_data");
            Timestamp eventCreateTime = rs.getTimestamp("event_create_time");
            int version = rs.getInt("version");
            Timestamp createTime = rs.getTimestamp("create_time");
            SubscribeEventEntity entity = new SubscribeEventEntity();
            entity.setId(Long.valueOf(id));
            entity.setSubscriberName(subscribeName);
            entity.setStatus(SubscribeStatus.valeOf((int)status));
            entity.setSubscribeTime(Dates.of((Date)subscribeTime));
            entity.setEventId(Long.valueOf(eventId));
            entity.setEventName(eventName);
            entity.setEventData(eventDataStr);
            entity.setEventCreateTime(Dates.of((Date)eventCreateTime));
            entity.setVersion(Integer.valueOf(version));
            entity.setCreateTime(Dates.of((Date)createTime));
            return entity;
        });
    }

    public int compensate(SubscribeEventCompensationEntity subscribeEventCompensationEntity) {
        MapSqlParameterSource sqlParams = new MapSqlParameterSource("subscribe_event_id", (Object)subscribeEventCompensationEntity.getSubscribeEventId());
        sqlParams.addValue("start_time", (Object)subscribeEventCompensationEntity.getStartTime());
        sqlParams.addValue("taken", (Object)subscribeEventCompensationEntity.getTaken());
        sqlParams.addValue("failed_msg", (Object)subscribeEventCompensationEntity.getFailedMsg());
        return this.jdbcTemplate.update(SQL_COMPENSATE, (SqlParameterSource)sqlParams);
    }
}

