/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.fugue.FugueLifecycleComponent;
import org.symphonyoss.s2.fugue.FugueLifecycleState;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.core.trace.ITraceContextFactory;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeRetryableConsumer;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;

public abstract class AbstractSubscriberManager<P, T extends AbstractSubscriberManager<P, T>>
extends FugueLifecycleComponent<T> {
    protected static final long FAILED_DEAD_LETTER_RETRY_TIME = TimeUnit.HOURS.toMillis(1L);
    protected static final long FAILED_CONSUMER_RETRY_TIME = TimeUnit.SECONDS.toMillis(30L);
    private static final Logger log_ = LoggerFactory.getLogger(AbstractSubscriberManager.class);
    private final ITraceContextFactory traceFactory_;
    private final IThreadSafeRetryableConsumer<P> consumer_;
    private final IThreadSafeConsumer<P> unprocessableMessageConsumer_;
    private Map<String, Set<String>> subscriptionsByTopic_ = new HashMap<String, Set<String>>();
    private Map<String, Set<String>> topicsBySubscription_ = new HashMap<String, Set<String>>();

    public AbstractSubscriberManager(Class<T> type, ITraceContextFactory traceFactory, IThreadSafeRetryableConsumer<P> consumer, IThreadSafeConsumer<P> unprocessableMessageConsumer) {
        super(type);
        this.traceFactory_ = traceFactory;
        this.consumer_ = consumer;
        this.unprocessableMessageConsumer_ = unprocessableMessageConsumer;
    }

    public synchronized T withSubscription(String topicName, String subscriptionName) {
        this.assertConfigurable();
        Set<String> set = this.subscriptionsByTopic_.get(topicName);
        if (set == null) {
            set = new HashSet<String>();
            this.subscriptionsByTopic_.put(topicName, set);
        }
        if (!set.add(subscriptionName)) {
            throw new IllegalArgumentException("Subscription " + subscriptionName + " on topic " + topicName + " already exists.");
        }
        set = this.topicsBySubscription_.get(subscriptionName);
        if (set == null) {
            set = new HashSet<String>();
            this.topicsBySubscription_.put(subscriptionName, set);
        }
        set.add(topicName);
        return (T)((Object)((AbstractSubscriberManager)this.self()));
    }

    protected abstract void startSubscriptions(Map<String, Set<String>> var1, Map<String, Set<String>> var2);

    protected abstract void stopSubscriptions();

    protected ITraceContextFactory getTraceFactory() {
        return this.traceFactory_;
    }

    public final synchronized void start() {
        this.setLifeCycleState(FugueLifecycleState.Starting);
        this.startSubscriptions(this.subscriptionsByTopic_, this.topicsBySubscription_);
        this.setLifeCycleState(FugueLifecycleState.Running);
    }

    public final synchronized void stop() {
        this.setLifeCycleState(FugueLifecycleState.Stopping);
        this.stopSubscriptions();
        this.setLifeCycleState(FugueLifecycleState.Stopped);
    }

    public long handleMessage(P immutableByteArray, ITraceContext trace) {
        try {
            this.consumer_.consume(immutableByteArray, trace);
        }
        catch (RetryableConsumerException e) {
            log_.warn("Unprocessable message, will retry", (Throwable)e);
            if (e.getRetryTime() == null || e.getRetryTimeUnit() == null) {
                return FAILED_CONSUMER_RETRY_TIME;
            }
            return e.getRetryTimeUnit().toMillis(e.getRetryTime());
        }
        catch (RuntimeException e) {
            log_.warn("Unprocessable message, will retry", (Throwable)e);
            return FAILED_CONSUMER_RETRY_TIME;
        }
        catch (FatalConsumerException e) {
            log_.error("Unprocessable message, aborted", (Throwable)e);
            trace.trace("MESSAGE_IS_UNPROCESSABLE");
            try {
                this.unprocessableMessageConsumer_.consume(immutableByteArray, trace);
            }
            catch (RuntimeException e2) {
                log_.error("Unprocessable message consumer failed", (Throwable)e);
                return FAILED_DEAD_LETTER_RETRY_TIME;
            }
        }
        return -1L;
    }
}

