/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.iteration;

import io.reactivex.rxjava3.core.Flowable;
import java.lang.invoke.MethodHandles;
import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.iteration.KeyTracker;
import org.infinispan.client.hotrod.impl.iteration.KeyTrackerFactory;
import org.infinispan.client.hotrod.impl.iteration.RemoteInnerPublisherHandler;
import org.infinispan.client.hotrod.impl.operations.CacheOperationsFactory;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.IterationEndResponse;
import org.infinispan.client.hotrod.impl.operations.IterationNextResponse;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.netty.OperationDispatcher;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class RemotePublisher<K, E>
implements Publisher<Map.Entry<K, E>> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final CacheOperationsFactory operationsFactory;
    private final OperationDispatcher dispatcher;
    private final String filterConverterFactory;
    private final byte[][] filterParams;
    private final IntSet segments;
    private final int batchSize;
    private final boolean metadata;
    private final KeyTracker segmentKeyTracker;
    private final Set<SocketAddress> failedServers = ConcurrentHashMap.newKeySet();

    public RemotePublisher(CacheOperationsFactory operationsFactory, OperationDispatcher dispatcher, String filterConverterFactory, Object[] filterParams, Set<Integer> segments, int batchSize, boolean metadata, DataFormat dataFormat) {
        this.operationsFactory = operationsFactory;
        this.dispatcher = dispatcher;
        this.filterConverterFactory = filterConverterFactory;
        this.filterParams = operationsFactory.marshallParams(filterParams);
        String cacheName = operationsFactory.getRemoteCache().getName();
        SegmentConsistentHash segmentConsistentHash = (SegmentConsistentHash)dispatcher.getConsistentHash(cacheName);
        if (segments == null) {
            if (segmentConsistentHash != null) {
                int maxSegment = segmentConsistentHash.getNumSegments();
                this.segments = IntSets.concurrentSet((int)maxSegment);
                for (int i = 0; i < maxSegment; ++i) {
                    this.segments.set(i);
                }
            } else {
                this.segments = null;
            }
        } else {
            this.segments = IntSets.concurrentCopyFrom((IntSet)IntSets.from(segments), (int)(Collections.max(segments) + 1));
        }
        this.batchSize = batchSize;
        this.metadata = metadata;
        this.segmentKeyTracker = KeyTrackerFactory.create(dataFormat, segmentConsistentHash, dispatcher.getTopologyId(cacheName), segments);
    }

    public void subscribe(Subscriber<? super Map.Entry<K, E>> subscriber) {
        if (this.segments == null) {
            final AtomicBoolean shouldRetry = new AtomicBoolean(true);
            RemoteInnerPublisherHandler innerHandler = new RemoteInnerPublisherHandler<K, E>(this, this.batchSize, () -> {
                if (shouldRetry.getAndSet(false)) {
                    return new AbstractMap.SimpleImmutableEntry<Object, Object>(null, null);
                }
                return null;
            }, null){

                @Override
                protected void handleThrowableInResponse(Throwable t, Map.Entry<SocketAddress, IntSet> target) {
                    shouldRetry.set(true);
                    super.handleThrowableInResponse(t, target);
                }
            };
            innerHandler.startPublisher().subscribe(subscriber);
            return;
        }
        Flowable.just((Object)this.segments).map(segments -> {
            Map<SocketAddress, Set<Integer>> segmentsByAddress = this.dispatcher.getPrimarySegmentsByAddress(this.operationsFactory.getRemoteCache().getName());
            HashMap<SocketAddress, IntSet> actualTargets = new HashMap<SocketAddress, IntSet>(segmentsByAddress.size());
            for (Map.Entry<SocketAddress, Set<Integer>> entry : segmentsByAddress.entrySet()) {
                SocketAddress targetAddress = entry.getKey();
                if (this.failedServers.contains(targetAddress)) {
                    targetAddress = null;
                }
                IntSet segmentsNeeded = null;
                Set<Integer> targetSegments = entry.getValue();
                for (int targetSegment : targetSegments) {
                    if (!segments.contains(targetSegment)) continue;
                    if (segmentsNeeded == null) {
                        segmentsNeeded = IntSets.mutableEmptySet();
                    }
                    segmentsNeeded.set(targetSegment);
                }
                if (segmentsNeeded == null) continue;
                actualTargets.put(targetAddress, segmentsNeeded);
            }
            if (actualTargets.isEmpty()) {
                actualTargets.put((SocketAddress)null, (IntSet)segments);
            }
            return actualTargets;
        }).flatMap(actualTargets -> {
            int batchSize = this.batchSize / actualTargets.size() + 1;
            return Flowable.fromIterable(actualTargets.entrySet()).map(entry -> {
                log.tracef("Requesting next for: %s", entry);
                RemoteInnerPublisherHandler innerHandler = new RemoteInnerPublisherHandler(this, batchSize, () -> null, (Map.Entry<SocketAddress, IntSet>)entry);
                return innerHandler.startPublisher();
            }).flatMap(RxJavaInterop.identityFunction(), actualTargets.size());
        }).repeatUntil(() -> {
            log.tracef("Segments left to process are %s", this.segments);
            return this.segments.isEmpty();
        }).subscribe(subscriber);
    }

    void erroredServer(SocketAddress socketAddress) {
        if (socketAddress != null) {
            this.failedServers.add(socketAddress);
        }
    }

    CompletionStage<Void> sendCancel(byte[] iterationId, SocketAddress socketAddress) {
        CompletionStage<IterationEndResponse> endResponseStage = this.dispatcher.executeOnSingleAddress(this.operationsFactory.newIterationEndOperation(iterationId), socketAddress);
        return endResponseStage.handle((endResponse, t) -> {
            if (t != null) {
                Log.HOTROD.ignoringErrorDuringIterationClose(this.iterationId(iterationId), (Throwable)t);
            } else {
                short status = endResponse.getStatus();
                if (HotRodConstants.isSuccess(status) && Log.HOTROD.isDebugEnabled()) {
                    Log.HOTROD.iterationClosed(this.iterationId(iterationId));
                }
                if (HotRodConstants.isInvalidIteration(status)) {
                    throw Log.HOTROD.errorClosingIteration(this.iterationId(iterationId));
                }
            }
            return null;
        });
    }

    String iterationId(byte[] iterationId) {
        return new String(iterationId, HotRodConstants.HOTROD_STRING_CHARSET);
    }

    void completeSegments(IntSet completedSegments) {
        if (this.segments != null) {
            this.segments.removeAll(completedSegments);
        }
    }

    CompletionStage<IterationStartResponse> newIteratorStartOperation(SocketAddress address, IntSet segments, int batchSize) {
        HotRodOperation<IterationStartResponse> op = this.operationsFactory.newIterationStartOperation(this.filterConverterFactory, this.filterParams, segments, batchSize, this.metadata);
        if (address == null) {
            return this.dispatcher.execute(op);
        }
        return this.dispatcher.executeOnSingleAddress(op, address);
    }

    CompletionStage<IterationNextResponse<K, E>> newIteratorNextOperation(byte[] iterationId, SocketAddress socketAddress) {
        HotRodOperation op = this.operationsFactory.newIterationNextOperation(iterationId, this.segmentKeyTracker);
        return this.dispatcher.executeOnSingleAddress(op, socketAddress);
    }
}

