/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.fugue.pubsub;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.symphonyoss.s2.fugue.counter.IBusyCounter;
import org.symphonyoss.s2.fugue.counter.ICounter;
import org.symphonyoss.s2.fugue.counter.ScaleAction;
import org.symphonyoss.s2.fugue.deploy.ExecutorBatch;
import org.symphonyoss.s2.fugue.pipeline.FatalConsumerException;
import org.symphonyoss.s2.fugue.pipeline.RetryableConsumerException;
import org.symphonyoss.s2.fugue.pubsub.AbstractPullSubscriberManager;
import org.symphonyoss.s2.fugue.pubsub.IPullSubscriberContext;
import org.symphonyoss.s2.fugue.pubsub.IPullSubscriberMessage;

public abstract class AbstractPullSubscriber
implements Runnable {
    private static final Logger log_ = LoggerFactory.getLogger(AbstractPullSubscriber.class);
    private final AbstractPullSubscriberManager<?, ?> manager_;
    private final String subscriptionName_;
    private final ICounter counter_;
    private final IBusyCounter busyCounter_;
    private final long extensionFrequency_;
    private boolean running_ = true;

    public AbstractPullSubscriber(AbstractPullSubscriberManager<?, ?> manager, String subscriptionName, ICounter counter, IBusyCounter busyCounter, long extensionFrequency) {
        this.manager_ = manager;
        this.subscriptionName_ = subscriptionName;
        this.counter_ = counter;
        this.busyCounter_ = busyCounter;
        this.extensionFrequency_ = extensionFrequency;
    }

    protected abstract IPullSubscriberContext getContext() throws IOException;

    protected abstract Runnable getNonIdleSubscriber();

    protected void getSomeMessages() {
        log_.info("About to read for " + this.subscriptionName_ + "...");
        try (IPullSubscriberContext context = this.getContext();){
            this.getSomeMessages(context);
        }
        catch (IOException e) {
            log_.error("Unable to pull messages", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    protected void getSomeMessages(IPullSubscriberContext context) {
        log_.info("About to read for " + this.subscriptionName_ + "...");
        try {
            Collection<IPullSubscriberMessage> messages = context.nonBlockingPull();
            if (messages.isEmpty()) {
                if (this.busyCounter_ != null && this.busyCounter_.busy(0) == ScaleAction.ScaleDown) {
                    this.stop();
                    return;
                }
                log_.info("Blocking read for " + this.subscriptionName_ + "...");
                messages = context.blockingPull();
                log_.info("Blocking read for " + this.subscriptionName_ + " returned " + messages.size());
            } else {
                if (this.busyCounter_ != null) {
                    this.busyCounter_.busy(messages.size());
                }
                if (this.isRunning()) {
                    this.manager_.submit(this.getNonIdleSubscriber(), false);
                    log_.debug("Extra schedule " + this.subscriptionName_);
                }
                log_.info("Non-Blocking read for " + this.subscriptionName_ + " returned " + messages.size());
            }
            if (messages.isEmpty()) {
                return;
            }
            ExecutorBatch batch = new ExecutorBatch((ExecutorService)this.manager_.getHandlerExecutor());
            if (this.counter_ != null) {
                this.counter_.increment(messages.size());
            }
            try {
                Collection incompleteTasks;
                for (IPullSubscriberMessage iPullSubscriberMessage : messages) {
                    log_.debug("handle message " + iPullSubscriberMessage.getMessageId());
                    batch.submit((Runnable)iPullSubscriberMessage);
                }
                do {
                    incompleteTasks = batch.waitForAllTasks(this.extensionFrequency_);
                    for (IPullSubscriberMessage message : incompleteTasks) {
                        log_.debug("extend message " + message.getMessageId());
                        message.extend();
                    }
                } while (!incompleteTasks.isEmpty());
            }
            catch (RuntimeException e) {
                void var5_13;
                Throwable throwable = e.getCause();
                if (throwable instanceof ExecutionException) {
                    Throwable throwable2 = throwable.getCause();
                }
                if (var5_13 instanceof RetryableConsumerException) {
                    throw (RetryableConsumerException)var5_13;
                }
                if (var5_13 instanceof FatalConsumerException) {
                    throw (FatalConsumerException)var5_13;
                }
                throw e;
            }
        }
        catch (RuntimeException e) {
            log_.error("Error processing message", (Throwable)e);
        }
        catch (Throwable e) {
            try {
                log_.error("Error processing message", e);
            }
            finally {
                System.exit(1);
            }
        }
        finally {
            log_.debug("Done pull request");
        }
    }

    @Override
    public void run() {
        this.run(true);
    }

    protected void run(boolean runIfIdle) {
        if (this.isRunning()) {
            if (runIfIdle) {
                try {
                    while (this.isRunning()) {
                        this.getSomeMessages();
                    }
                }
                finally {
                    if (runIfIdle && this.isRunning()) {
                        log_.error("Main subscriber thread returned, rescheduling...");
                        this.manager_.submit(this, true);
                    }
                }
            } else if (this.isRunning()) {
                this.getSomeMessages();
            }
        }
    }

    public synchronized boolean isRunning() {
        return this.running_;
    }

    public synchronized void stop() {
        this.running_ = false;
    }
}

