/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.hotrod.impl.operations;

import io.netty.channel.Channel;
import io.netty.handler.codec.DecoderException;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.infinispan.api.common.CacheOptions;
import org.infinispan.hotrod.exceptions.HotRodClientException;
import org.infinispan.hotrod.exceptions.RemoteIllegalLifecycleStateException;
import org.infinispan.hotrod.exceptions.RemoteNodeSuspectException;
import org.infinispan.hotrod.exceptions.TransportException;
import org.infinispan.hotrod.impl.DataFormat;
import org.infinispan.hotrod.impl.logging.Log;
import org.infinispan.hotrod.impl.logging.LogFactory;
import org.infinispan.hotrod.impl.operations.HotRodOperation;
import org.infinispan.hotrod.impl.operations.OperationContext;
import org.infinispan.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.hotrod.impl.transport.netty.HeaderDecoder;

public abstract class RetryOnFailureOperation<T>
extends HotRodOperation<T>
implements ChannelOperation {
    protected static final Log log = LogFactory.getLog(RetryOnFailureOperation.class, Log.class);
    private int retryCount = 0;
    private Set<SocketAddress> failedServers = null;

    protected RetryOnFailureOperation(OperationContext operationContext, short requestCode, short responseCode, CacheOptions options, DataFormat dataFormat) {
        super(operationContext, requestCode, responseCode, options, dataFormat);
    }

    @Override
    public CompletionStage<T> execute() {
        assert (!this.isDone());
        try {
            if (log.isTraceEnabled()) {
                log.tracef("Requesting channel for operation %s", this);
            }
            this.fetchChannelAndInvoke(this.retryCount, this.failedServers);
        }
        catch (Exception e) {
            this.completeExceptionally(e);
        }
        return this;
    }

    @Override
    public void invoke(Channel channel) {
        try {
            if (log.isTraceEnabled()) {
                log.tracef("About to start executing operation %s on %s", this, channel);
            }
            this.executeOperation(channel);
        }
        catch (Throwable t) {
            this.completeExceptionally(t);
        }
        finally {
            this.releaseChannel(channel);
        }
    }

    @Override
    public void cancel(SocketAddress address, Throwable cause) {
        if ((cause = this.handleException(cause, null, address)) != null) {
            this.completeExceptionally(cause);
        }
    }

    private void retryIfNotDone() {
        if (this.isDone()) {
            if (log.isTraceEnabled()) {
                log.tracef("Not retrying as done (exceptionally=%s), retryCount=%d", this.isCompletedExceptionally(), this.retryCount);
            }
        } else {
            this.reset();
            this.fetchChannelAndInvoke(this.retryCount, this.failedServers);
        }
    }

    protected void reset() {
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(false);
            this.timeoutFuture = null;
        }
        this.header.topologyAge(this.operationContext.getChannelFactory().getTopologyAge());
    }

    private Set<SocketAddress> addFailedServer(SocketAddress address) {
        if (this.failedServers == null) {
            this.failedServers = new HashSet<SocketAddress>();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Add %s to failed servers", address);
        }
        this.failedServers.add(address);
        return this.failedServers;
    }

    @Override
    public void channelInactive(Channel channel) {
        if (this.isDone()) {
            return;
        }
        SocketAddress address = ChannelRecord.of(channel).getUnresolvedAddress();
        this.addFailedServer(address);
        this.logAndRetryOrFail(Log.HOTROD.connectionClosed(address, address));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void exceptionCaught(Channel channel, Throwable cause) {
        SocketAddress address = channel == null ? null : ChannelRecord.of(channel).getUnresolvedAddress();
        if ((cause = this.handleException(cause, channel, address)) != null) {
            try {
                this.completeExceptionally(cause);
            }
            finally {
                if (channel != null) {
                    Log.HOTROD.closingChannelAfterError(channel, cause);
                    channel.close();
                }
            }
        }
    }

    protected Throwable handleException(Throwable cause, Channel channel, SocketAddress address) {
        while (cause instanceof DecoderException && cause.getCause() != null) {
            cause = cause.getCause();
        }
        if (cause instanceof RemoteIllegalLifecycleStateException || cause instanceof IOException || cause instanceof TransportException) {
            if (Thread.interrupted()) {
                this.completeExceptionally(new InterruptedException());
                return null;
            }
            if (address != null) {
                this.addFailedServer(address);
            }
            if (channel != null) {
                HeaderDecoder headerDecoder = (HeaderDecoder)channel.pipeline().get("header-decoder");
                if (headerDecoder != null) {
                    channel.pipeline().remove("header-decoder");
                }
                Log.HOTROD.closingChannelAfterError(channel, cause);
                channel.close();
                if (headerDecoder != null) {
                    headerDecoder.failoverClientListeners();
                }
            }
            this.logAndRetryOrFail(cause);
            return null;
        }
        if (cause instanceof RemoteNodeSuspectException) {
            this.logAndRetryOrFail(cause);
            return null;
        }
        if (cause instanceof HotRodClientException && ((HotRodClientException)cause).isServerError()) {
            this.completeExceptionally(cause);
            return null;
        }
        return cause;
    }

    protected void logAndRetryOrFail(Throwable e) {
        if (this.retryCount < this.operationContext.getChannelFactory().getMaxRetries()) {
            if (log.isTraceEnabled()) {
                log.tracef(e, "Exception encountered in %s. Retry %d out of %d", this, this.retryCount, this.operationContext.getChannelFactory().getMaxRetries());
            }
            ++this.retryCount;
            this.operationContext.getChannelFactory().incrementRetryCount();
            this.retryIfNotDone();
        } else {
            Log.HOTROD.exceptionAndNoRetriesLeft(this.retryCount, this.operationContext.getChannelFactory().getMaxRetries(), e);
            this.completeExceptionally(e);
        }
    }

    protected void fetchChannelAndInvoke(int retryCount, Set<SocketAddress> failedServers) {
        this.operationContext.getChannelFactory().fetchChannelAndInvoke(failedServers, this.operationContext.getCacheNameBytes(), this);
    }

    protected abstract void executeOperation(Channel var1);
}

