/*
 * Decompiled with CFR 0.152.
 */
package org.gorpipe.spark.redis;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.gorpipe.exceptions.ExceptionUtilities;
import org.gorpipe.spark.platform.JedisURIHelper;
import org.gorpipe.spark.platform.SharedRedisPools;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

class MonitorThread
implements Callable<String> {
    boolean running = true;
    private JedisPool jedisPool;
    private Map<String, Future<List<String>>> futureActionSet = new ConcurrentHashMap<String, Future<List<String>>>();

    public MonitorThread(String redisUri) {
        if (redisUri != null && redisUri.length() > 0) {
            this.jedisPool = SharedRedisPools.getJedisPool(JedisURIHelper.create(redisUri));
        }
    }

    public void stopRunning() {
        this.running = false;
    }

    public void addJob(String jobId, Future<List<String>> fut) {
        this.futureActionSet.put(jobId, fut);
    }

    private Duration getJobExpiration() {
        return Duration.ofMinutes(20L);
    }

    public void setValue(String[] jobIds, String field, String value) {
        if (this.jedisPool != null) {
            try (Jedis jedis = this.jedisPool.getResource();){
                for (String jobId : jobIds) {
                    jedis.hset(jobId, field, value);
                    jedis.expire(jobId, (int)this.getJobExpiration().getSeconds());
                }
            }
        }
    }

    public void setValues(String[] jobIds, String field, String[] values) {
        if (this.jedisPool != null) {
            try (Jedis jedis = this.jedisPool.getResource();){
                for (int i = 0; i < jobIds.length; ++i) {
                    String jobId = jobIds[i];
                    String value = values[i];
                    jedis.hset(jobId, field, value);
                    jedis.expire(jobId, (int)this.getJobExpiration().getSeconds());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String call() throws Exception {
        try {
            String reskey = null;
            while (this.running) {
                for (String key : this.futureActionSet.keySet()) {
                    Future<List<String>> fut = this.futureActionSet.get(key);
                    String[] jobIds = key.split(",");
                    try {
                        List<String> res = fut.get(500L, TimeUnit.MILLISECONDS);
                        reskey = key;
                        String[] cacheFiles = (String[])res.stream().map(s -> s.split("\t")).map(s -> s[2]).toArray(String[]::new);
                        this.setValues(jobIds, "result", cacheFiles);
                        this.setValue(jobIds, "status", "DONE");
                        break;
                    }
                    catch (ExecutionException e) {
                        reskey = key;
                        this.setValue(jobIds, "error", ExceptionUtilities.gorExceptionToJson((Throwable)e.getCause()));
                        this.setValue(jobIds, "status", "FAILED");
                        break;
                    }
                    catch (TimeoutException timeoutException) {
                    }
                }
                if (reskey != null) {
                    this.futureActionSet.remove(reskey);
                    reskey = null;
                }
                if (!this.futureActionSet.isEmpty()) continue;
                Thread.sleep(500L);
            }
        }
        finally {
            if (this.jedisPool != null) {
                this.jedisPool = null;
            }
        }
        return "";
    }
}

