/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import stream.data.DataFactory;
import stream.io.AbstractStream;
import stream.io.SourceURL;
import stream.util.ByteSize;

public abstract class ByteChunkStream
extends AbstractStream {
    static Logger log = LoggerFactory.getLogger(ByteChunkStream.class);
    public static final byte[] GIF_SIGNATURE = new byte[]{71, 73, 70, 56};
    public static final byte[] JPG_SIGNATURE = new byte[]{-1, -40};
    public static final int DEFAULT_BUFFER_SIZE = 32768;
    SourceURL url;
    ByteBuffer buffer;
    InputStream input;
    final ReadableByteChannel channel;
    Long frameId = 0L;
    final byte[] signature;
    String key = "data";
    int bufferSize = 0x200000;
    Long firstRead = 0L;
    Long bytesRead = 0L;
    Long chunks = 0L;

    public ByteChunkStream(SourceURL url, byte[] signature) throws Exception {
        this(url.openStream(), signature);
        this.url = url;
    }

    public ByteChunkStream(InputStream in, byte[] signature) throws Exception {
        this.input = in;
        this.channel = Channels.newChannel(in);
        this.signature = signature;
    }

    public void init() throws Exception {
        super.init();
        log.info("Using buffer size of {}k", (Object)(this.bufferSize / 1024));
        this.buffer = ByteBuffer.allocateDirect(this.bufferSize);
        if (this.buffer.isDirect()) {
            log.info("ByteBuffer is using direct memory.");
        } else {
            log.info("ByteBuffer is non-direct memory.");
        }
    }

    public void close() throws Exception {
        super.close();
        this.buffer.clear();
        this.channel.close();
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Parameter(description="The internal buffer size of this stream.")
    public void setBufferSize(ByteSize bufferSize) {
        this.bufferSize = bufferSize.getBytes();
    }

    private int indexOf(byte[] sig) {
        return this.indexOf(sig, 0);
    }

    protected int indexOf(byte[] sig, int from) {
        int pos = from;
        while (pos + sig.length < this.buffer.limit() && !this.isSignatureAt(pos, sig)) {
            ++pos;
        }
        if (pos + sig.length >= this.buffer.limit()) {
            return -1;
        }
        return pos;
    }

    protected boolean isSignatureAt(int pos, byte[] sig) {
        for (int i = 0; i < sig.length; ++i) {
            byte b = this.buffer.get(pos + i);
            if (b == sig[i]) continue;
            return false;
        }
        return true;
    }

    private int readBytes() throws IOException {
        return this.channel.read(this.buffer);
    }

    public synchronized Data readNext() throws Exception {
        int read = this.readBytes();
        while (read == 0) {
            Thread.yield();
            read = this.readBytes();
            if (read >= 0) continue;
            return null;
        }
        if (read > 0) {
            this.bytesRead = this.bytesRead + (long)read;
        }
        int start = this.indexOf(this.signature);
        while (start < 0) {
            this.buffer.position(this.buffer.limit() - this.signature.length);
            this.buffer.compact();
            read = this.readBytes();
            while (read == 0 && this.buffer.hasRemaining()) {
                Thread.yield();
                read = this.readBytes();
            }
            this.bytesRead = this.bytesRead + (long)read;
            start = this.indexOf(this.signature);
            if (start >= 0 || read >= 0) continue;
            return null;
        }
        this.buffer.mark();
        int end = this.indexOf(this.signature, start + this.signature.length);
        while (end < 0 && this.buffer.capacity() > 0) {
            read = this.channel.read(this.buffer);
            while (read == 0) {
                Thread.yield();
                read = this.readBytes();
            }
            if (read < 0) {
                return null;
            }
            end = this.indexOf(this.signature);
        }
        if (end < 0) {
            return null;
        }
        this.buffer.position(start);
        byte[] output = new byte[end - start];
        this.buffer.get(output, 0, end - start);
        this.buffer.compact();
        Data instance = DataFactory.create();
        instance.put((Object)this.key, (Object)output);
        Long l = this.chunks;
        Long l2 = this.chunks = Long.valueOf(this.chunks + 1L);
        if (this.firstRead == 0L) {
            this.firstRead = System.currentTimeMillis();
        } else if (this.chunks % 100L == 0L) {
            Long seconds = System.currentTimeMillis() - this.firstRead;
            log.debug("Reading rate after {} chunks is {} chunks/second", (Object)this.chunks, (Object)(1000.0 * (this.chunks.doubleValue() / seconds.doubleValue())));
            log.debug("{} bytes read", (Object)this.bytesRead);
        }
        return instance;
    }
}

