/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.common.fault.FaultAccumulator;
import org.symphonyoss.s2.common.fault.TransientTransactionFault;
import org.symphonyoss.s2.common.fluent.BaseAbstractBuilder;
import org.symphonyoss.s2.fugue.FugueLifecycleComponent;
import org.symphonyoss.s2.fugue.FugueLifecycleState;
import org.symphonyoss.s2.fugue.config.IConfiguration;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextTransactionFactory;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.naming.INameFactory;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManagerBuilder;
import org.symphonyoss.s2.fugue.pubsub.ISubscription;

public abstract class AbstractSubscriberManager<P, T extends AbstractSubscriberManager<P, T>>
extends FugueLifecycleComponent<T>
implements ISubscriberManager<T> {
    protected static final long FAILED_DEAD_LETTER_RETRY_TIME = TimeUnit.HOURS.toMillis(1L);
    protected static final long FAILED_CONSUMER_RETRY_TIME = TimeUnit.SECONDS.toMillis(30L);
    protected static final long MESSAGE_PROCESSED_OK = -1L;
    private static final Logger log_ = LoggerFactory.getLogger(AbstractSubscriberManager.class);
    private static final Integer FAILURE_CNT_LIMIT = 25;
    protected final INameFactory nameFactory_;
    protected final ImmutableList<ISubscription<P>> subscribers_;
    protected final ITraceContextTransactionFactory traceFactory_;
    protected final IThreadSafeErrorConsumer<P> unprocessableMessageConsumer_;
    protected final IConfiguration config_;
    protected final ICounter counter_;
    protected final int totalSubscriptionCnt_;
    private Cache<String, Integer> failureCache_ = CacheBuilder.newBuilder().maximumSize(5000L).expireAfterAccess(30L, TimeUnit.MINUTES).build();

    protected AbstractSubscriberManager(Class<T> type, Builder<?, P, T> builder) {
        super(type);
        this.nameFactory_ = builder.nameFactory_;
        this.subscribers_ = ImmutableList.copyOf(builder.subscribers_);
        this.traceFactory_ = builder.traceFactory_;
        this.unprocessableMessageConsumer_ = builder.unprocessableMessageConsumer_;
        this.config_ = builder.config_;
        this.counter_ = builder.counter_;
        this.totalSubscriptionCnt_ = builder.totalSubscriptionCnt_;
    }

    @Override
    public int getTotalSubscriptionCnt() {
        return this.totalSubscriptionCnt_;
    }

    protected ICounter getCounter() {
        return this.counter_;
    }

    protected abstract void initSubscription(ISubscription<P> var1);

    protected abstract void startSubscriptions();

    protected abstract void stopSubscriptions();

    protected ITraceContextTransactionFactory getTraceFactory() {
        return this.traceFactory_;
    }

    public synchronized void start() {
        this.setLifeCycleState(FugueLifecycleState.Starting);
        for (ISubscription s : this.subscribers_) {
            this.initSubscription(s);
        }
        this.startSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Running);
    }

    public void quiesce() {
        this.setLifeCycleState(FugueLifecycleState.Quiescing);
        this.stopSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Quiescent);
    }

    public synchronized void stop() {
        this.setLifeCycleState(FugueLifecycleState.Stopping);
        this.stopSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Stopped);
    }

    public long handleMessage(IThreadSafeRetryableConsumer<P> consumer, P payload, ITraceContext trace, String messageId) {
        try {
            consumer.consume(payload, trace);
        }
        catch (TransientTransactionFault e) {
            log_.warn("Transient processing failure, will retry (forever)", (Throwable)e);
            return this.retryMessage(payload, trace, e, messageId, e.getRetryTime(), e.getRetryTimeUnit(), true);
        }
        catch (RetryableConsumerException e) {
            log_.warn("Transient processing failure, will retry (forever)", (Throwable)e);
            return this.retryMessage(payload, trace, e, messageId, e.getRetryTime(), e.getRetryTimeUnit(), true);
        }
        catch (RuntimeException e) {
            return this.retryMessage(payload, trace, e, messageId, FAILED_CONSUMER_RETRY_TIME, false);
        }
        catch (FatalConsumerException e) {
            log_.error("Unprocessable message, aborted", (Throwable)e);
            trace.trace("MESSAGE_IS_UNPROCESSABLE");
            return this.abortMessage(payload, trace, e);
        }
        return -1L;
    }

    private long retryMessage(P payload, ITraceContext trace, Throwable cause, String messageId, Long retryTime, TimeUnit retryTimeUnit, boolean retryForever) {
        long delay = retryTime == null || retryTimeUnit == null ? FAILED_CONSUMER_RETRY_TIME : retryTimeUnit.toMillis(retryTime);
        return this.retryMessage(payload, trace, cause, messageId, delay, retryForever);
    }

    private long retryMessage(P payload, ITraceContext trace, Throwable cause, String messageId, long retryTime, boolean retryForever) {
        if (!retryForever) {
            Integer cnt = (Integer)this.failureCache_.getIfPresent((Object)messageId);
            if (cnt == null) {
                cnt = 1;
                this.failureCache_.put((Object)messageId, (Object)cnt);
            } else {
                if (cnt >= FAILURE_CNT_LIMIT) {
                    log_.error("Retryable processing error failed " + cnt + " times, aborting messageId " + messageId);
                    trace.trace("MESSAGE_RETRIES_EXCEEDED");
                    this.failureCache_.invalidate((Object)messageId);
                    return this.abortMessage(payload, trace, cause);
                }
                cnt = cnt + 1;
                this.failureCache_.put((Object)messageId, (Object)cnt);
            }
            log_.warn("Message processing failed " + cnt + " times, will retry", cause);
        }
        return retryTime;
    }

    private long abortMessage(P payload, ITraceContext trace, Throwable e) {
        try {
            this.unprocessableMessageConsumer_.consume(payload, trace, e.getLocalizedMessage(), e);
            return -1L;
        }
        catch (RuntimeException e2) {
            log_.error("Unprocessable message consumer failed", e);
            return FAILED_DEAD_LETTER_RETRY_TIME;
        }
    }

    protected static abstract class Builder<T extends Builder<T, P, B>, P, B extends AbstractSubscriberManager<P, B>>
    extends BaseAbstractBuilder<T, B>
    implements ISubscriberManagerBuilder<T, P, B> {
        protected INameFactory nameFactory_;
        protected List<ISubscription<P>> subscribers_ = new LinkedList<ISubscription<P>>();
        protected ITraceContextTransactionFactory traceFactory_;
        protected IThreadSafeErrorConsumer<P> unprocessableMessageConsumer_;
        protected IConfiguration config_;
        protected ICounter counter_;
        protected int totalSubscriptionCnt_;

        protected Builder(Class<T> type) {
            super(type);
        }

        @Override
        public T withNameFactory(INameFactory nameFactory) {
            this.nameFactory_ = nameFactory;
            return (T)((Builder)this.self());
        }

        @Override
        public T withSubscription(ISubscription<P> subscription) {
            this.subscribers_.add(subscription);
            this.totalSubscriptionCnt_ += subscription.getSubscriptionNames().size();
            return (T)((Builder)this.self());
        }

        @Override
        public T withConfig(IConfiguration config) {
            this.config_ = config;
            return (T)((Builder)this.self());
        }

        @Override
        public T withCounter(ICounter counter) {
            this.counter_ = counter;
            return (T)((Builder)this.self());
        }

        @Override
        public T withTraceContextTransactionFactory(ITraceContextTransactionFactory traceFactory) {
            this.traceFactory_ = traceFactory;
            return (T)((Builder)this.self());
        }

        public ITraceContextTransactionFactory getTraceContextTransactionFactory() {
            return this.traceFactory_;
        }

        @Override
        public T withUnprocessableMessageConsumer(IThreadSafeErrorConsumer<P> unprocessableMessageConsumer) {
            this.unprocessableMessageConsumer_ = unprocessableMessageConsumer;
            return (T)((Builder)this.self());
        }

        public void validate(FaultAccumulator faultAccumulator) {
            super.validate(faultAccumulator);
            faultAccumulator.checkNotNull((Object)this.traceFactory_, "traceFactory");
            faultAccumulator.checkNotNull(this.unprocessableMessageConsumer_, "unprocessableMessageConsumer");
            faultAccumulator.checkNotNull((Object)this.config_, "config");
        }
    }
}

