/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.Processor;
import stream.ProcessorList;
import stream.annotations.Description;
import stream.annotations.Parameter;
import stream.data.DataFactory;
import stream.io.CsvStream;
import stream.io.DataStream;

@Description(name="Stream Process (exec)", group="Data Stream.Sources")
public class ProcessStream
implements DataStream {
    static Logger log = LoggerFactory.getLogger(ProcessStream.class);
    final ProcessorList processors = new ProcessorList();
    protected Process process;
    protected DataStream dataStream;
    protected Class<? extends DataStream> dataStreamClass = CsvStream.class;
    protected String format;
    protected String command;
    protected String id;

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    @Override
    public Map<String, Class<?>> getAttributes() {
        return null;
    }

    @Override
    public Data readNext() throws Exception {
        return this.readNext(DataFactory.create());
    }

    public String getCommand() {
        return this.command;
    }

    @Parameter(required=true, description="The command to execute. This command will be spawned and is assumed to output data to standard output.")
    public void setCommand(String command) {
        this.command = command;
    }

    public String getFormat() {
        return this.format;
    }

    @Override
    public void init() throws Exception {
        Runtime runtime = Runtime.getRuntime();
        this.process = runtime.exec(this.command);
        this.dataStreamClass = Class.forName(this.format);
        Constructor<? extends DataStream> stream = this.dataStreamClass.getConstructor(InputStream.class);
        InputStream input = this.process.getInputStream();
        this.dataStream = stream.newInstance(input);
    }

    @Parameter(required=true, values={"stream.io.CsvStream", "stream.io.SvmLight", "stream.io.JSONStream", "stream.io.LineStream"}, defaultValue="stream.io.CsvStream", description="The format of the input (standard input), defaults to CSV")
    public void setFormat(String format) {
        this.format = format;
    }

    @Override
    public Data readNext(Data datum) throws Exception {
        if (this.process == null) {
            this.init();
        }
        return this.dataStream.readNext(datum);
    }

    @Override
    public void close() {
        if (this.process != null) {
            this.process.destroy();
        }
    }

    @Override
    public List<Processor> getPreprocessors() {
        return this.processors.getProcessors();
    }
}

