/*
 * Decompiled with CFR 0.152.
 */
package stream.monitor;

import java.text.DecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.AbstractProcessor;
import stream.data.Data;
import stream.data.Statistics;
import stream.statistics.StatisticsService;

public class DataRate
extends AbstractProcessor
implements StatisticsService {
    final DecimalFormat fmt = new DecimalFormat("0.000");
    static Logger log = LoggerFactory.getLogger(DataRate.class);
    String clock = null;
    Long count = 0L;
    Long start = null;
    Long windowCount = 0L;
    Long last = 0L;
    Double elapsed = 0.0;
    Double rate = new Double(0.0);
    Integer every = null;
    String key = "dataRate";

    public String getClock() {
        return this.clock;
    }

    public void setClock(String clock) {
        this.clock = clock;
    }

    public String getKey() {
        return this.key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    @Override
    public Data process(Data input) {
        Long sec;
        Long l;
        Long l2;
        if (this.start == null) {
            this.start = System.currentTimeMillis();
        }
        Long now = System.currentTimeMillis();
        if (this.clock != null) {
            now = new Long(input.get(this.clock) + "");
            if (this.last == 0L) {
                this.last = now;
            }
        }
        Double seconds = (double)Math.abs(this.last - now) / 1000.0;
        if (now > this.last) {
            this.elapsed = this.elapsed + seconds;
            this.rate = (double)this.windowCount.longValue() / seconds;
            this.last = now;
            this.windowCount = 1L;
            if (this.key != null) {
                input.put("time", new Double(this.elapsed));
                input.put(this.key, new Double(this.rate));
            }
        } else {
            l2 = this.windowCount;
            l = this.windowCount = Long.valueOf(this.windowCount + 1L);
        }
        if (this.every != null && this.count % (long)this.every.intValue() == 0L && this.start < now && (sec = Long.valueOf((now - this.start) / 1000L)) > 0L) {
            log.info("{} items processed, data-rate is: {}/second", this.count, (Object)this.fmt.format(this.count.doubleValue() / sec.doubleValue()));
        }
        l2 = this.count;
        l = this.count = Long.valueOf(this.count + 1L);
        return input;
    }

    @Override
    public void reset() throws Exception {
        this.count = 0L;
        this.windowCount = 1L;
        this.last = 0L;
    }

    @Override
    public Statistics getStatistics() {
        Statistics st = new Statistics();
        st.put("dataRate", this.rate);
        return st;
    }

    public Integer getEvery() {
        return this.every;
    }

    public void setEvery(Integer every) {
        this.every = every;
    }
}

