/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.fugue.FugueLifecycleComponent;
import org.symphonyoss.s2.fugue.FugueLifecycleState;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;
import org.symphonyoss.s2.fugue.pubsub.ISubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.Subscription;

public abstract class AbstractSubscriberManager<P, T extends ISubscriberManager<P, T>>
extends FugueLifecycleComponent<T>
implements ISubscriberManager<P, T> {
    protected static final long FAILED_DEAD_LETTER_RETRY_TIME = TimeUnit.HOURS.toMillis(1L);
    protected static final long FAILED_CONSUMER_RETRY_TIME = TimeUnit.SECONDS.toMillis(30L);
    private static final Logger log_ = LoggerFactory.getLogger(AbstractSubscriberManager.class);
    private final ITraceContextFactory traceFactory_;
    private final IThreadSafeConsumer<P> unprocessableMessageConsumer_;
    private List<Subscription<P>> subscribers_ = new ArrayList<Subscription<P>>();

    protected AbstractSubscriberManager(Class<T> type, ITraceContextFactory traceFactory, IThreadSafeConsumer<P> unprocessableMessageConsumer) {
        super(type);
        this.traceFactory_ = traceFactory;
        this.unprocessableMessageConsumer_ = unprocessableMessageConsumer;
    }

    @Override
    public synchronized T withSubscription(String topicName, String subscriptionName, IThreadSafeRetryableConsumer<P> consumer) {
        this.assertConfigurable();
        this.subscribers_.add(new Subscription<P>(topicName, subscriptionName, consumer));
        return (T)((ISubscriberManager)this.self());
    }

    @Override
    public synchronized T withSubscriptionsByConfig(List<String> topicNames, String subscriptionName, IThreadSafeRetryableConsumer<P> consumer) {
        this.assertConfigurable();
        this.subscribers_.add(new Subscription<P>(topicNames, subscriptionName, consumer));
        return (T)((ISubscriberManager)this.self());
    }

    protected abstract void startSubscription(Subscription<P> var1);

    protected abstract void stopSubscriptions();

    protected ITraceContextFactory getTraceFactory() {
        return this.traceFactory_;
    }

    protected List<Subscription<P>> getSubscribers() {
        return this.subscribers_;
    }

    public synchronized void start() {
        this.setLifeCycleState(FugueLifecycleState.Starting);
        for (Subscription<P> s : this.subscribers_) {
            this.startSubscription(s);
        }
        this.setLifeCycleState(FugueLifecycleState.Running);
    }

    public synchronized void stop() {
        this.setLifeCycleState(FugueLifecycleState.Stopping);
        this.stopSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Stopped);
    }

    public long handleMessage(IThreadSafeRetryableConsumer<P> consumer, P payload, ITraceContext trace) {
        try {
            consumer.consume(payload, trace);
        }
        catch (RetryableConsumerException e) {
            log_.warn("Unprocessable message, will retry", (Throwable)e);
            if (e.getRetryTime() == null || e.getRetryTimeUnit() == null) {
                return FAILED_CONSUMER_RETRY_TIME;
            }
            return e.getRetryTimeUnit().toMillis(e.getRetryTime());
        }
        catch (RuntimeException e) {
            log_.warn("Unprocessable message, will retry", (Throwable)e);
            return FAILED_CONSUMER_RETRY_TIME;
        }
        catch (FatalConsumerException e) {
            log_.error("Unprocessable message, aborted", (Throwable)e);
            trace.trace("MESSAGE_IS_UNPROCESSABLE");
            try {
                this.unprocessableMessageConsumer_.consume(payload, trace);
            }
            catch (RuntimeException e2) {
                log_.error("Unprocessable message consumer failed", (Throwable)e);
                return FAILED_DEAD_LETTER_RETRY_TIME;
            }
        }
        return -1L;
    }
}

