/*
 * Decompiled with CFR 0.152.
 */
package stream.runtime;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Context;
import stream.Data;
import stream.Process;
import stream.ProcessContext;
import stream.Processor;
import stream.StatefulProcessor;
import stream.io.Sink;
import stream.io.Source;
import stream.runtime.ProcessContextImpl;

public abstract class AbstractProcess
implements Process {
    static Logger log = LoggerFactory.getLogger(AbstractProcess.class);
    protected Context parentContext;
    protected ProcessContext processContext;
    protected Source source;
    protected Sink sink;
    protected final List<Processor> processors = new ArrayList<Processor>();
    int[] counts;
    long[] millis;

    @Override
    public void setSource(Source ds) {
        this.source = ds;
    }

    @Override
    public Source getSource() {
        return this.source;
    }

    @Override
    public void setSink(Sink sink) {
        this.sink = sink;
    }

    @Override
    public Sink getSink() {
        return this.sink;
    }

    public Data process(Data input) {
        Data data = input;
        log.debug("processing data {}", (Object)input);
        int i = 0;
        for (Processor proc : this.processors) {
            long start = System.currentTimeMillis();
            data = proc.process(data);
            int n = i;
            this.counts[n] = this.counts[n] + 1;
            int n2 = i++;
            this.millis[n2] = this.millis[n2] + (System.currentTimeMillis() - start);
            if (data != null) continue;
            return null;
        }
        return data;
    }

    @Override
    public void init(Context context) throws Exception {
        this.parentContext = context;
        this.processContext = new ProcessContextImpl(context);
        for (Processor proc : this.processors) {
            if (!(proc instanceof StatefulProcessor)) continue;
            ((StatefulProcessor)proc).init(this.processContext);
        }
        log.debug("Process {} (source: {}) initialized, processors: ", (Object)this, (Object)this.getSource());
        int i = 0;
        for (Processor proc : this.processors) {
            log.debug("   {}", (Object)proc);
            this.counts[i] = 0;
            this.millis[i] = 0L;
            ++i;
        }
    }

    @Override
    public void finish() throws Exception {
        log.debug("Finishing process {} (source: {})...", (Object)this, (Object)this.getSource().getId());
        try {
            int i = 0;
            for (Processor proc : this.processors) {
                if (proc instanceof StatefulProcessor) {
                    try {
                        log.debug("Finishing processor {}", (Object)proc);
                        ((StatefulProcessor)proc).finish();
                    }
                    catch (Exception e) {
                        log.error("Failed to finish processor '{}': {}", (Object)proc, (Object)e.getMessage());
                        e.printStackTrace();
                    }
                }
                log.debug("processor {} processed {} items", (Object)proc, (Object)this.counts[i]);
                log.debug("   average time is {} ms/item", (Object)((double)this.millis[i] / (double)this.counts[i]));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void execute() {
        try {
            Data item = this.getSource().read();
            while (item != null) {
                item = this.process(item);
                if (this.getSink() != null) {
                    log.debug("Sending process output to connected sink {}", (Object)this.getSink());
                    this.getSink().write(item);
                }
                item = this.getSource().read();
            }
            log.debug("No more items could be read, exiting this process.");
        }
        catch (Exception e) {
            log.error("Aborting process due to errors: {}", (Object)e.getMessage());
            e.printStackTrace();
        }
        try {
            this.finish();
        }
        catch (Exception e) {
            log.warn("Error while finishing process: {}", (Object)e.getMessage());
            e.printStackTrace();
        }
    }

    public ProcessContext getContext() {
        return this.processContext;
    }

    @Override
    public void add(Processor p) {
        this.processors.add(p);
        this.counts = new int[this.processors.size()];
        this.millis = new long[this.processors.size()];
    }

    @Override
    public void remove(Processor p) {
        this.processors.remove(p);
        this.counts = new int[this.processors.size()];
        this.millis = new long[this.processors.size()];
    }

    @Override
    public List<Processor> getProcessors() {
        return this.processors;
    }

    public String toString() {
        return this.getClass().getCanonicalName() + "[" + super.toString() + "]";
    }
}

