/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.messagestore.redis;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.message.DefaultHeaderAttr;
import de.otto.synapse.message.HeaderAttr;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.messagestore.Index;
import de.otto.synapse.messagestore.Indexer;
import de.otto.synapse.messagestore.MessageStore;
import de.otto.synapse.messagestore.MessageStoreEntry;
import de.otto.synapse.messagestore.redis.BatchedRedisHashedListIterator;
import de.otto.synapse.translator.Decoder;
import de.otto.synapse.translator.Encoder;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.TextDecoder;
import de.otto.synapse.translator.TextEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;

@Beta
public class RedisIndexedMessageStore
implements MessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(RedisIndexedMessageStore.class);
    private static final int CHARACTERISTICS = 1296;
    private final String name;
    private final Indexer indexer;
    private final RedisTemplate<String, String> redisTemplate;
    private final int batchSize;
    private final int maxSize;
    private final Encoder<String> encoder;
    private final Decoder<String> decoder;
    private final long maxAge;

    public RedisIndexedMessageStore(String name, int batchSize, int maxMessages, long maxAge, Indexer indexer, RedisTemplate<String, String> stringRedisTemplate) {
        this(name, batchSize, maxMessages, maxAge, indexer, stringRedisTemplate, (Encoder<String>)new TextEncoder(MessageFormat.V2), (Decoder<String>)new TextDecoder());
    }

    public RedisIndexedMessageStore(String name, int batchSize, int maxMessages, long maxAge, Indexer indexer, RedisTemplate<String, String> stringRedisTemplate, Encoder<String> messageEncoder, Decoder<String> messageDecoder) {
        this.name = name;
        this.maxAge = maxAge;
        this.indexer = indexer;
        this.redisTemplate = stringRedisTemplate;
        this.batchSize = batchSize;
        this.maxSize = maxMessages;
        this.encoder = messageEncoder;
        this.decoder = messageDecoder;
    }

    public String getName() {
        return this.name;
    }

    public void add(MessageStoreEntry entry) {
        final MessageStoreEntry indexedEntry = this.indexer.index(entry);
        final TextMessage textMessage = indexedEntry.getTextMessage();
        final String messageId = this.messageIdCalculator(textMessage);
        List txResults = (List)this.redisTemplate.execute((SessionCallback)new SessionCallback<List<Object>>(){

            public List<Object> execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                textMessage.getHeader().getShardPosition().ifPresent(shardPosition -> {
                    String channelPosKey = RedisIndexedMessageStore.this.name + "-" + indexedEntry.getChannelName() + "-channelPos";
                    BoundHashOperations channelPosHash = operations.boundHashOps((Object)channelPosKey);
                    channelPosHash.put((Object)shardPosition.shardName(), (Object)shardPosition.position());
                });
                String channelNamesKey = RedisIndexedMessageStore.this.name + "-channels";
                BoundSetOperations channelNamesSet = operations.boundSetOps((Object)channelNamesKey);
                channelNamesSet.add(new Object[]{indexedEntry.getChannelName()});
                String messageHashKey = RedisIndexedMessageStore.this.name + "-message-" + messageId;
                BoundHashOperations messageHash = operations.boundHashOps((Object)messageHashKey);
                messageHash.putAll((Map)RedisIndexedMessageStore.this.encode(indexedEntry));
                messageHash.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                String messagesListKey = RedisIndexedMessageStore.this.name + "-messages";
                BoundListOperations messageList = operations.boundListOps((Object)messagesListKey);
                messageList.rightPush((Object)messageHashKey);
                messageList.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                messageList.trim((long)(-RedisIndexedMessageStore.this.maxSize), -1L);
                indexedEntry.getFilterValues().entrySet().forEach(filterEntry -> {
                    String indexListKey = RedisIndexedMessageStore.this.name + "-" + ((Index)filterEntry.getKey()).getName() + "-" + (String)filterEntry.getValue();
                    BoundListOperations partitionIndexList = operations.boundListOps((Object)indexListKey);
                    partitionIndexList.rightPush((Object)messageHashKey);
                    partitionIndexList.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                });
                return operations.exec();
            }
        });
        LOG.debug("Redis returned with " + txResults);
    }

    public Set<String> getChannelNames() {
        Set members = this.redisTemplate.boundSetOps((Object)(this.name + "-channels")).members();
        return members;
    }

    public ImmutableSet<Index> getIndexes() {
        return this.indexer.getIndexes();
    }

    public ChannelPosition getLatestChannelPosition(String channelName) {
        Set shardPositions = this.redisTemplate.boundHashOps((Object)(this.name + "-" + channelName + "-channelPos")).entries().entrySet().stream().map(entry -> ShardPosition.fromPosition((String)entry.getKey().toString(), (String)entry.getValue().toString())).collect(Collectors.toSet());
        return ChannelPosition.channelPosition(shardPositions);
    }

    public Stream<MessageStoreEntry> stream() {
        BatchedRedisHashedListIterator<MessageStoreEntry> messageIterator = new BatchedRedisHashedListIterator<MessageStoreEntry>(this.redisTemplate, this::decode, this.name + "-messages", this.batchSize);
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(messageIterator, 1296), false);
    }

    public Stream<MessageStoreEntry> stream(Index index, String value) {
        BatchedRedisHashedListIterator<MessageStoreEntry> messageIterator = new BatchedRedisHashedListIterator<MessageStoreEntry>(this.redisTemplate, this::decode, this.name + "-" + index.getName() + "-" + value, this.batchSize);
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(messageIterator, 1296), false);
    }

    public int size() {
        return this.redisTemplate.boundListOps((Object)(this.name + "-messages")).size().intValue();
    }

    public void close() {
    }

    public void clear() {
        ArrayList<String> keys = new ArrayList<String>(Arrays.asList(this.name + "-channels", this.name + "-messages"));
        this.getChannelNames().forEach(channel -> keys.add(this.name + "-" + channel + "-channelPos"));
        this.redisTemplate.delete(keys);
    }

    private ImmutableMap<String, String> encode(MessageStoreEntry entry) {
        ImmutableMap.Builder builder = ImmutableMap.builder().put((Object)"_channelName", (Object)entry.getChannelName()).put((Object)"_message", this.encoder.apply((Object)entry.getTextMessage()));
        entry.getFilterValues().forEach((key, value) -> builder.put((Object)key.getName(), value));
        return builder.build();
    }

    private MessageStoreEntry decode(Map<String, String> map) {
        Map<Index, String> filterValues = map.entrySet().stream().filter(this::isFilterValue).collect(Collectors.toMap(entry -> Index.valueOf((String)((String)entry.getKey())), entry -> (String)entry.getValue()));
        return MessageStoreEntry.of((String)map.get("_channelName"), (ImmutableMap)ImmutableMap.copyOf(filterValues), (TextMessage)((TextMessage)this.decoder.apply((Object)map.get("_message"))));
    }

    private boolean isFilterValue(Map.Entry<String, String> entry) {
        return !entry.getKey().equals("_channelName") && !entry.getKey().equals("_message");
    }

    private final String messageIdCalculator(TextMessage message) {
        String msgId = message.getHeader().getAsString((HeaderAttr)DefaultHeaderAttr.MSG_ID);
        return msgId != null ? msgId : UUID.randomUUID().toString();
    }
}

