/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.nifi.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.Tuple;
import org.qubership.nifi.service.BulkDistributedMapCacheClient;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.types.Expiration;

@Tags(value={"redis", "distributed", "cache", "map"})
@CapabilityDescription(value="")
public class RedisBulkDistributedMapCacheClientService
extends AbstractControllerService
implements BulkDistributedMapCacheClient {
    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder().name("redis-connection-pool").displayName("Redis Connection Pool").identifiesControllerService(RedisConnectionPool.class).required(false).build();
    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder().name("redis-cache-ttl").displayName("TTL").description("Indicates how long the data should exist in Redis.Setting '0 secs' would mean the data would exist forever").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).required(true).defaultValue("0 secs").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    private volatile RedisConnectionPool redisConnectionPool;
    private Long ttlValue;
    private static final Serializer<String> STRING_SERIALIZER;
    private static final Deserializer<String> VALUE_DESERIALIZER;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException {
        this.redisConnectionPool = (RedisConnectionPool)context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        this.ttlValue = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
        if (this.ttlValue == 0L) {
            this.ttlValue = -1L;
        }
    }

    @OnDisabled
    public void shutdown() {
        this.redisConnectionPool = null;
    }

    public <K, V> Map<K, V> getAndPutIfAbsent(Map<K, V> map, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (Map)this.withConnection(redisConnection -> {
            int i;
            String luaScript = "local result = {}\nfor i in ipairs(KEYS) do\n  local currentValue = redis.call(\"GET\", KEYS[i])\n  if (not currentValue) then \n    redis.call(\"SET\", KEYS[i], ARGV[i])\n  end\n  result[i] = currentValue \nend \n return result";
            ArrayList keys = new ArrayList();
            ArrayList values = new ArrayList();
            map.forEach((key, value) -> {
                keys.add(key);
                values.add(value);
            });
            byte[][] serialisedParams = new byte[keys.size() * 2][];
            for (i = 0; i < keys.size(); ++i) {
                Object key2 = keys.get(i);
                serialisedParams[i] = this.serialize(key2, keySerializer);
            }
            for (i = 0; i < values.size(); ++i) {
                Object value2 = values.get(i);
                serialisedParams[i + keys.size()] = this.serialize(value2, valueSerializer);
            }
            String luaScriptName = redisConnection.scriptingCommands().scriptLoad(this.serialize(luaScript, STRING_SERIALIZER));
            List oldValues = (List)redisConnection.scriptingCommands().evalSha(luaScriptName, ReturnType.MULTI, keys.size(), (byte[][])serialisedParams);
            HashMap res = new HashMap();
            for (int i2 = 0; i2 < oldValues.size(); ++i2) {
                Object oldValue = oldValues.get(i2);
                res.put(keys.get(i2), oldValue == null ? null : valueDeserializer.deserialize((byte[])oldValue));
            }
            return res;
        });
    }

    public <K> long remove(List<K> keys, Serializer<K> keySerializer) throws IOException {
        if (keys.isEmpty()) {
            return 0L;
        }
        return (Long)this.withConnection(redisConnection -> {
            byte[][] serialisedKeys = new byte[keys.size()][];
            for (int i = 0; i < keys.size(); ++i) {
                Object key = keys.get(i);
                serialisedKeys[i] = this.serialize(key, keySerializer);
            }
            long numRemoved = redisConnection.del((byte[][])serialisedKeys);
            return numRemoved;
        });
    }

    public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        return (Boolean)this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            boolean set = redisConnection.setNX((byte[])kv.getKey(), (byte[])kv.getValue());
            if (this.ttlValue != -1L && set) {
                redisConnection.expire((byte[])kv.getKey(), this.ttlValue.longValue());
            }
            return set;
        });
    }

    public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
        return (Boolean)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            return redisConnection.exists(k);
        });
    }

    public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        this.withConnection(redisConnection -> {
            Tuple<byte[], byte[]> kv = this.serialize(key, value, keySerializer, valueSerializer);
            redisConnection.set((byte[])kv.getKey(), (byte[])kv.getValue(), Expiration.seconds((long)this.ttlValue), RedisStringCommands.SetOption.upsert());
            return null;
        });
    }

    public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
        return (V)this.withConnection(redisConnection -> {
            byte[] k = this.serialize(key, keySerializer);
            byte[] v = redisConnection.get(k);
            return valueDeserializer.deserialize(v);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T withConnection(RedisAction<T> action) throws IOException {
        RedisConnection redisConnection = null;
        try {
            redisConnection = this.redisConnectionPool.getConnection();
            Object object = action.execute(redisConnection);
            return (T)object;
        }
        finally {
            if (redisConnection != null) {
                try {
                    redisConnection.close();
                }
                catch (Exception e) {
                    this.getLogger().warn("Error closing connection: " + e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    private <K, V> Tuple<byte[], byte[]> serialize(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        byte[] k = out.toByteArray();
        out.reset();
        valueSerializer.serialize(value, (OutputStream)out);
        byte[] v = out.toByteArray();
        return new Tuple((Object)k, (Object)v);
    }

    private <K> byte[] serialize(K key, Serializer<K> keySerializer) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        keySerializer.serialize(key, (OutputStream)out);
        return out.toByteArray();
    }

    static {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(REDIS_CONNECTION_POOL);
        props.add(TTL);
        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
        STRING_SERIALIZER = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
        VALUE_DESERIALIZER = String::new;
    }
}

