/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.ProcessContext;
import stream.Processor;
import stream.StatefulProcessor;
import stream.io.QueueService;
import stream.runtime.ProcessListener;

public abstract class AbstractProcess
extends Thread
implements Runnable,
Processor {
    static Logger log = LoggerFactory.getLogger(AbstractProcess.class);
    protected boolean running = true;
    protected ProcessContext context;
    Long interval = 1000L;
    String intervalString = "1000ms";
    protected final List<ProcessListener> processListener = new ArrayList<ProcessListener>();
    protected final List<Processor> processors = new ArrayList<Processor>();
    protected Long count = 0L;
    protected Data lastItem = null;
    protected QueueService outputQueue;

    public abstract Data getNextItem();

    @Override
    public Data process(Data input) {
        Data data = input;
        log.debug("processing data {}", (Object)input);
        for (Processor proc : this.processors) {
            data = proc.process(data);
            if (data != null) continue;
            return null;
        }
        return data;
    }

    public void init(ProcessContext context) throws Exception {
        this.context = context;
        for (Processor proc : this.processors) {
            if (!(proc instanceof StatefulProcessor)) continue;
            ((StatefulProcessor)proc).init(context);
        }
    }

    public void finish() throws Exception {
        log.debug("Finishing process...");
        this.running = false;
        try {
            for (Processor proc : this.processors) {
                if (!(proc instanceof StatefulProcessor)) continue;
                try {
                    log.debug("Finishing processor {}", (Object)proc);
                    ((StatefulProcessor)proc).finish();
                }
                catch (Exception e) {
                    log.error("Failed to finish processor '{}': {}", (Object)proc, (Object)e.getMessage());
                    if (!log.isDebugEnabled()) continue;
                    e.printStackTrace();
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        this.interrupt();
    }

    @Override
    public void run() {
        block11: {
            for (ProcessListener processListener : this.processListener) {
                processListener.processStarted(this);
            }
            try {
                while (this.running) {
                    Data item = this.getNextItem();
                    if (item == null) {
                        log.debug("No more items could be read, exiting this process.");
                        this.running = false;
                        break;
                    }
                    item = this.process(item);
                    if (this.outputQueue != null) {
                        log.debug("Sending process output to connected output-queue {}", (Object)this.outputQueue);
                        this.outputQueue.enqueue(item);
                    }
                    Long l = this.count;
                    Long l2 = this.count = Long.valueOf(this.count + 1L);
                }
            }
            catch (Exception e) {
                log.error("Aborting process due to errors: {}", (Object)e.getMessage());
                if (log.isDebugEnabled()) {
                    e.printStackTrace();
                }
                this.running = false;
            }
            try {
                this.finish();
            }
            catch (Exception e) {
                log.warn("Error while finishing process: {}", (Object)e.getMessage());
                if (!log.isDebugEnabled()) break block11;
                e.printStackTrace();
            }
        }
        for (ProcessListener processListener : this.processListener) {
            processListener.processFinished(this);
        }
    }

    public ProcessContext getContext() {
        return this.context;
    }

    public void addProcessor(Processor p) {
        this.processors.add(p);
    }

    public void removeProcessor(Processor p) {
        this.processors.remove(p);
    }

    public List<Processor> getProcessors() {
        return this.processors;
    }

    public Long getNumberOfItemsProcessed() {
        return this.count;
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override
    public String toString() {
        return this.getClass().getCanonicalName() + "[" + super.toString() + "]";
    }

    public void addListener(ProcessListener l) {
        this.processListener.add(l);
    }

    public void removeListener(ProcessListener l) {
        this.processListener.remove(l);
    }
}

