/*
 * Decompiled with CFR 0.152.
 */
package de.otto.synapse.messagestore.redis;

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.messagestore.WritableMessageStore;
import de.otto.synapse.messagestore.redis.BatchedRedisListIterator;
import de.otto.synapse.translator.Decoder;
import de.otto.synapse.translator.Encoder;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.TextDecoder;
import de.otto.synapse.translator.TextEncoder;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;

public class RedisMessageStore
implements WritableMessageStore {
    private static final int CHARACTERISTICS = 1296;
    private final String channelName;
    private final RedisTemplate<String, String> redisTemplate;
    private final int batchSize;
    private final int maxSize;
    private final Encoder<String> encoder;
    private final Decoder<String> decoder;

    public RedisMessageStore(String channelName, int batchSize, int ringBufferSize, RedisTemplate<String, String> stringRedisTemplate) {
        this(channelName, batchSize, ringBufferSize, stringRedisTemplate, (Encoder<String>)new TextEncoder(MessageFormat.V2), (Decoder<String>)new TextDecoder());
    }

    public RedisMessageStore(String channelName, int batchSize, int ringBufferSize, RedisTemplate<String, String> stringRedisTemplate, Encoder<String> messageEncoder, Decoder<String> messageDecoder) {
        this.channelName = channelName;
        this.redisTemplate = stringRedisTemplate;
        this.batchSize = batchSize;
        this.maxSize = ringBufferSize;
        this.encoder = messageEncoder;
        this.decoder = messageDecoder;
    }

    public String getChannelName() {
        return this.channelName;
    }

    public void add(final TextMessage message) {
        List txResults = (List)this.redisTemplate.execute((SessionCallback)new SessionCallback<List<Object>>(){

            public List<Object> execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                message.getHeader().getShardPosition().ifPresent(shardPosition -> operations.boundHashOps((Object)(RedisMessageStore.this.channelName + "-channelPos")).put((Object)shardPosition.shardName(), (Object)shardPosition.position()));
                operations.boundListOps((Object)(RedisMessageStore.this.channelName + "-messages")).rightPush(RedisMessageStore.this.encoder.apply((Object)message));
                operations.boundListOps((Object)(RedisMessageStore.this.channelName + "-messages")).trim((long)(-RedisMessageStore.this.maxSize), -1L);
                return operations.exec();
            }
        });
    }

    public ChannelPosition getLatestChannelPosition() {
        Set shardPositions = this.redisTemplate.boundHashOps((Object)(this.channelName + "-channelPos")).entries().entrySet().stream().map(entry -> ShardPosition.fromPosition((String)entry.getKey().toString(), (String)entry.getValue().toString())).collect(Collectors.toSet());
        return ChannelPosition.channelPosition(shardPositions);
    }

    public Stream<TextMessage> stream() {
        BatchedRedisListIterator messageIterator = new BatchedRedisListIterator(this.redisTemplate, this.decoder, this.channelName + "-messages", this.batchSize);
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(messageIterator, 1296), false);
    }

    public int size() {
        return this.redisTemplate.boundListOps((Object)(this.channelName + "-messages")).size().intValue();
    }

    public void close() {
    }

    public void clear() {
        this.redisTemplate.delete(Arrays.asList(this.channelName + "-channelPos", this.channelName + "-messages"));
    }
}

