/*
 * Decompiled with CFR 0.152.
 */
package org.deltafi.common.action;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.deltafi.common.action.ActionEventQueueProperties;
import org.deltafi.common.queue.jedis.JedisKeyedBlockingQueue;
import org.deltafi.common.queue.jedis.SortedSetEntry;
import org.deltafi.common.types.ActionEvent;
import org.deltafi.common.types.ActionExecution;
import org.deltafi.common.types.ActionInput;
import org.deltafi.common.types.ActionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActionEventQueue {
    private static final Logger log = LoggerFactory.getLogger(ActionEventQueue.class);
    private static final ObjectMapper OBJECT_MAPPER;
    public static final String DGS_QUEUE = "dgs";
    private static final Duration LONG_RUNNING_HEARTBEAT_THRESHOLD;
    private final JedisKeyedBlockingQueue jedisKeyedBlockingQueue;

    public ActionEventQueue(ActionEventQueueProperties actionEventQueueProperties, int poolSize) throws URISyntaxException {
        int maxIdle = poolSize > 0 ? poolSize : actionEventQueueProperties.getMaxIdle();
        int maxTotal = poolSize > 0 ? poolSize : actionEventQueueProperties.getMaxTotal();
        this.jedisKeyedBlockingQueue = new JedisKeyedBlockingQueue(actionEventQueueProperties.getUrl(), actionEventQueueProperties.getPassword(), maxIdle, maxTotal);
        log.info("Jedis pool size: " + maxTotal);
    }

    public boolean queueHasTaskingForAction(ActionInput actionInput) {
        return this.jedisKeyedBlockingQueue.exists(actionInput.getQueueName(), "*\"name\":\"" + actionInput.getActionContext().getName() + "\"*");
    }

    public void putActions(List<ActionInput> actionInputs, boolean checkUnique) {
        ArrayList<SortedSetEntry> actions = new ArrayList<SortedSetEntry>();
        for (ActionInput actionInput : actionInputs) {
            if (actionInput.getAction() != null && actionInput.getAction().getState() == ActionState.COLD_QUEUED) continue;
            if (checkUnique) {
                String pattern = "*\"did\":\"" + actionInput.getActionContext().getDid() + "\"*";
                if (this.jedisKeyedBlockingQueue.exists(actionInput.getQueueName(), pattern)) {
                    log.warn("Skipping queueing for potential duplicate action event: {}", (Object)actionInput);
                    continue;
                }
            }
            try {
                actions.add(new SortedSetEntry(actionInput.getQueueName(), OBJECT_MAPPER.writeValueAsString((Object)actionInput), actionInput.getActionCreated()));
            }
            catch (JsonProcessingException e) {
                log.error("Unable to convert action to JSON", (Throwable)e);
            }
        }
        this.jedisKeyedBlockingQueue.put(actions);
    }

    public ActionInput takeAction(String actionClassName) throws JsonProcessingException {
        return ActionEventQueue.convertInput(this.jedisKeyedBlockingQueue.take(actionClassName));
    }

    private String queueName(String returnAddress) {
        Object queueName = DGS_QUEUE;
        if (returnAddress != null) {
            queueName = (String)queueName + "-" + returnAddress;
        }
        return queueName;
    }

    public void putResult(ActionEvent result, String returnAddress) throws JsonProcessingException {
        this.jedisKeyedBlockingQueue.put(new SortedSetEntry(this.queueName(returnAddress), OBJECT_MAPPER.writeValueAsString((Object)result), OffsetDateTime.now()));
    }

    public void putResults(List<ActionEvent> results, String returnAddress) throws JsonProcessingException {
        String queueName = this.queueName(returnAddress);
        ArrayList exceptions = new ArrayList();
        OffsetDateTime now = OffsetDateTime.now();
        List<SortedSetEntry> queuedResults = results.stream().map(actionEvent -> {
            try {
                return new SortedSetEntry(queueName, OBJECT_MAPPER.writeValueAsString(actionEvent), now);
            }
            catch (JsonProcessingException e) {
                exceptions.add(e);
                return null;
            }
        }).filter(Objects::isNull).toList();
        if (!exceptions.isEmpty()) {
            throw (JsonProcessingException)((Object)exceptions.get(0));
        }
        this.jedisKeyedBlockingQueue.put(queuedResults);
    }

    public ActionEvent takeResult(String returnAddress) throws JsonProcessingException {
        return ActionEventQueue.convertEvent(this.jedisKeyedBlockingQueue.take(this.queueName(returnAddress)));
    }

    public static ActionEvent convertEvent(String element) throws JsonProcessingException {
        return (ActionEvent)OBJECT_MAPPER.readValue(element, ActionEvent.class);
    }

    public static ActionInput convertInput(String element) throws JsonProcessingException {
        return (ActionInput)OBJECT_MAPPER.readValue(element, ActionInput.class);
    }

    public void setHeartbeat(String key) {
        this.jedisKeyedBlockingQueue.setHeartbeat(key);
    }

    public void drop(List<String> actionNames) {
        this.jedisKeyedBlockingQueue.drop(actionNames);
    }

    public Set<String> keys() {
        return this.jedisKeyedBlockingQueue.keys();
    }

    public long size(String key) {
        return this.jedisKeyedBlockingQueue.sortedSetSize(key);
    }

    public void recordLongRunningTask(ActionExecution actionExecution) {
        try {
            this.jedisKeyedBlockingQueue.recordLongRunningTask(actionExecution.key(), OBJECT_MAPPER.writeValueAsString(List.of(actionExecution.startTime().toString(), OffsetDateTime.now().toString())));
        }
        catch (JsonProcessingException e) {
            log.error("Unable to convert long running task information to JSON", (Throwable)e);
        }
    }

    public void removeLongRunningTask(ActionExecution actionExecution) {
        this.jedisKeyedBlockingQueue.removeLongRunningTask(actionExecution.key());
    }

    public List<ActionExecution> getLongRunningTasks() {
        Map<String, String> allTasks = this.jedisKeyedBlockingQueue.getLongRunningTasks();
        ArrayList<ActionExecution> longRunningTasks = new ArrayList<ActionExecution>();
        for (Map.Entry<String, String> entry : allTasks.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            try {
                List times = (List)OBJECT_MAPPER.readValue(value, (TypeReference)new TypeReference<List<OffsetDateTime>>(){});
                if (times.size() != 2) {
                    log.error("Unable to deserialize long running task time information from JSON");
                    continue;
                }
                OffsetDateTime startTime = (OffsetDateTime)times.get(0);
                OffsetDateTime heartbeatTime = (OffsetDateTime)times.get(1);
                if (heartbeatTime.plus(LONG_RUNNING_HEARTBEAT_THRESHOLD).isBefore(OffsetDateTime.now())) continue;
                String[] keyParts = key.split(":");
                String clazz = keyParts[0];
                String action = keyParts[1];
                String did = keyParts[2];
                longRunningTasks.add(new ActionExecution(clazz, action, did, startTime));
            }
            catch (JsonProcessingException e) {
                log.error("Unable to deserialize long running task information from JSON: " + key + " = " + value, (Throwable)e);
            }
        }
        return longRunningTasks;
    }

    public void removeExpiredLongRunningTasks() {
        Map<String, String> allTasks = this.jedisKeyedBlockingQueue.getLongRunningTasks();
        for (Map.Entry<String, String> entry : allTasks.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            try {
                List times = (List)OBJECT_MAPPER.readValue(value, (TypeReference)new TypeReference<List<OffsetDateTime>>(){});
                if (times.size() != 2) {
                    this.jedisKeyedBlockingQueue.removeLongRunningTask(key);
                    log.warn("Removed long-running task with malformed data (unexpected length) with key: {}", (Object)key);
                    continue;
                }
                OffsetDateTime heartbeatTime = (OffsetDateTime)times.get(1);
                if (!heartbeatTime.plus(LONG_RUNNING_HEARTBEAT_THRESHOLD).isBefore(OffsetDateTime.now())) continue;
                this.jedisKeyedBlockingQueue.removeLongRunningTask(key);
                log.info("Removed expired long-running task with key: {}", (Object)key);
            }
            catch (JsonProcessingException e) {
                this.jedisKeyedBlockingQueue.removeLongRunningTask(key);
                log.error("Unable to deserialize long running task information from JSON for key: {}. Removed the key.", (Object)key, (Object)e);
            }
        }
    }

    public boolean longRunningTaskExists(String clazz, String action, String did) {
        ActionExecution taskToCheck = new ActionExecution(clazz, action, did, null);
        String key = taskToCheck.key();
        String serializedValue = this.jedisKeyedBlockingQueue.getLongRunningTask(key);
        if (serializedValue == null) {
            return false;
        }
        try {
            List times = (List)OBJECT_MAPPER.readValue(serializedValue, (TypeReference)new TypeReference<List<OffsetDateTime>>(){});
            if (times.size() != 2) {
                log.error("Malformed long-running task data in Redis for key: {}", (Object)key);
                return false;
            }
            OffsetDateTime heartbeatTime = (OffsetDateTime)times.get(1);
            return !heartbeatTime.plus(LONG_RUNNING_HEARTBEAT_THRESHOLD).isBefore(OffsetDateTime.now());
        }
        catch (JsonProcessingException e) {
            log.error("Unable to deserialize long running task information from JSON for key: {}", (Object)key, (Object)e);
            return false;
        }
    }

    static {
        ObjectMapper temp = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).configure(DeserializationFeature.USE_LONG_FOR_INTS, true).registerModule((Module)new JavaTimeModule());
        StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(0x1000000).build();
        temp.getFactory().setStreamReadConstraints(streamReadConstraints);
        OBJECT_MAPPER = temp;
        LONG_RUNNING_HEARTBEAT_THRESHOLD = Duration.ofSeconds(30L);
    }
}

