/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.hotrod.streaming;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.time.TimeServiceTicker;
import org.infinispan.commons.util.ByRef;
import org.infinispan.metadata.Metadata;
import org.infinispan.server.core.logging.Log;
import org.infinispan.server.hotrod.streaming.GetStreamResponse;
import org.infinispan.server.hotrod.streaming.GetStreamingState;
import org.infinispan.server.hotrod.streaming.PutStreamingState;
import org.infinispan.server.hotrod.streaming.StreamingState;
import org.infinispan.util.concurrent.WithinThreadExecutor;

public class StreamingManager {
    private static final Log log = (Log)LogFactory.getLog(StreamingManager.class, Log.class);
    private static final AtomicInteger globalIterationId = new AtomicInteger();
    private final Map<Integer, StreamingState> iterationStateMap;

    public StreamingManager(TimeService timeService) {
        Caffeine builder = Caffeine.newBuilder();
        builder.expireAfterAccess(5L, TimeUnit.MINUTES).removalListener((key, value, cause) -> {
            if (value != null) {
                value.close();
            }
            if (cause.wasEvicted()) {
                log.removedUnclosedIterator(key.toString());
            }
        }).ticker((Ticker)new TimeServiceTicker(timeService)).executor((Executor)new WithinThreadExecutor());
        this.iterationStateMap = builder.build().asMap();
    }

    public GetStreamResponse startGetStream(byte[] key, byte[] value, Channel channel, int batchSize) {
        GetStreamingState state = new GetStreamingState(key, channel, value, batchSize);
        int id = globalIterationId.getAndIncrement();
        this.iterationStateMap.put(id, state);
        return new GetStreamResponse(id, state.nextGet(), state.isGetComplete());
    }

    public GetStreamResponse nextGetStream(Integer streamId) {
        StreamingState state = this.iterationStateMap.get(streamId);
        if (state == null) {
            return null;
        }
        return new GetStreamResponse(streamId, state.nextGet(), state.isGetComplete());
    }

    public void closeGetStream(Integer iterationId) {
        this.iterationStateMap.computeIfPresent(iterationId, (k, v) -> {
            v.closeGet();
            return null;
        });
    }

    public int startPutStream(byte[] key, Channel channel, Metadata.Builder metadata, long version) {
        PutStreamingState state = new PutStreamingState(key, channel, metadata, version);
        int id = globalIterationId.getAndIncrement();
        this.iterationStateMap.put(id, state);
        return id;
    }

    public StreamingState nextPutStream(Integer streamId, boolean lastChunk, ByteBuf buf) {
        if (lastChunk) {
            ByRef ref = new ByRef(null);
            this.iterationStateMap.computeIfPresent(streamId, (k, v) -> {
                v.nextPut(buf);
                ref.set(v);
                return null;
            });
            return (StreamingState)ref.get();
        }
        StreamingState state = this.iterationStateMap.get(streamId);
        if (state == null) {
            return null;
        }
        state.nextPut(buf);
        return state;
    }

    public void closePutStream(Integer streamId) {
        this.iterationStateMap.computeIfPresent(streamId, (k, v) -> {
            v.closePut();
            return null;
        });
    }
}

