/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.fugue.FugueLifecycleState;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;
import org.symphonyoss.s2.fugue.pubsub.AbstractSubscriberBase;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.Subscription;

public abstract class AbstractSubscriberManager<P, T extends ISubscriberManager<P, T>>
extends AbstractSubscriberBase<P, T>
implements ISubscriberManager<P, T> {
    protected static final long FAILED_DEAD_LETTER_RETRY_TIME = TimeUnit.HOURS.toMillis(1L);
    protected static final long FAILED_CONSUMER_RETRY_TIME = TimeUnit.SECONDS.toMillis(30L);
    protected static final long MESSAGE_PROCESSED_OK = -1L;
    private static final Logger log_ = LoggerFactory.getLogger(AbstractSubscriberManager.class);
    private static final Integer FAILURE_CNT_LIMIT = 5;
    private final ITraceContextFactory traceFactory_;
    private final IThreadSafeErrorConsumer<P> unprocessableMessageConsumer_;
    private Cache<String, Integer> failureCache_ = CacheBuilder.newBuilder().maximumSize(5000L).expireAfterAccess(30L, TimeUnit.MINUTES).build();

    protected AbstractSubscriberManager(Class<T> type, ITraceContextFactory traceFactory, IThreadSafeErrorConsumer<P> unprocessableMessageConsumer) {
        super(type);
        this.traceFactory_ = traceFactory;
        this.unprocessableMessageConsumer_ = unprocessableMessageConsumer;
    }

    @Override
    public T withSubscription(IThreadSafeRetryableConsumer<P> consumer, String subscriptionName, String topicName, String ... additionalTopicNames) {
        return (T)((ISubscriberManager)super.withSubscription(consumer, subscriptionName, topicName, additionalTopicNames));
    }

    @Override
    public T withSubscription(IThreadSafeRetryableConsumer<P> consumer, String subscriptionName, List<String> topicNames) {
        return (T)((ISubscriberManager)super.withSubscription(consumer, subscriptionName, topicNames));
    }

    @Override
    public T withSubscription(IThreadSafeRetryableConsumer<P> consumer, String subscriptionName, String[] topicNames) {
        return (T)((ISubscriberManager)super.withSubscription(consumer, subscriptionName, topicNames));
    }

    protected abstract void startSubscription(Subscription<P> var1);

    protected abstract void stopSubscriptions();

    protected ITraceContextFactory getTraceFactory() {
        return this.traceFactory_;
    }

    public synchronized void start() {
        this.setLifeCycleState(FugueLifecycleState.Starting);
        for (Subscription s : this.getSubscribers()) {
            this.startSubscription(s);
        }
        this.setLifeCycleState(FugueLifecycleState.Running);
    }

    public synchronized void stop() {
        this.setLifeCycleState(FugueLifecycleState.Stopping);
        this.stopSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Stopped);
    }

    public long handleMessage(IThreadSafeRetryableConsumer<P> consumer, P payload, ITraceContext trace, String messageId) {
        try {
            consumer.consume(payload, trace);
        }
        catch (RetryableConsumerException e) {
            log_.warn("Unprocessable message, will retry", (Throwable)e);
            if (e.getRetryTime() == null || e.getRetryTimeUnit() == null) {
                return this.retryMessage(payload, trace, e, messageId, FAILED_CONSUMER_RETRY_TIME);
            }
            return this.retryMessage(payload, trace, e, messageId, e.getRetryTimeUnit().toMillis(e.getRetryTime()));
        }
        catch (RuntimeException e) {
            return this.retryMessage(payload, trace, e, messageId, FAILED_CONSUMER_RETRY_TIME);
        }
        catch (FatalConsumerException e) {
            log_.error("Unprocessable message, aborted", (Throwable)e);
            trace.trace("MESSAGE_IS_UNPROCESSABLE");
            return this.abortMessage(payload, trace, e);
        }
        return -1L;
    }

    private long retryMessage(P payload, ITraceContext trace, Throwable cause, String messageId, long retryTime) {
        Integer cnt = (Integer)this.failureCache_.getIfPresent((Object)messageId);
        if (cnt == null) {
            cnt = 1;
            this.failureCache_.put((Object)messageId, (Object)cnt);
        } else {
            if (cnt >= FAILURE_CNT_LIMIT) {
                trace.trace("MESSAGE_RETRIES_EXCEEDED");
                return this.abortMessage(payload, trace, cause);
            }
            cnt = cnt + 1;
            this.failureCache_.put((Object)messageId, (Object)cnt);
        }
        log_.warn("Message processing failed " + cnt + " times, will retry", cause);
        return retryTime;
    }

    private long abortMessage(P payload, ITraceContext trace, Throwable e) {
        try {
            this.unprocessableMessageConsumer_.consume(payload, trace, e.getLocalizedMessage(), e);
            return -1L;
        }
        catch (RuntimeException e2) {
            log_.error("Unprocessable message consumer failed", e);
            return FAILED_DEAD_LETTER_RETRY_TIME;
        }
    }
}

