/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import stream.io.AbstractStream;
import stream.io.SourceURL;

public class TransportStream
extends AbstractStream {
    static Logger log = LoggerFactory.getLogger(TransportStream.class);
    InputStream stream;
    Long sequenceId = 0L;
    long offset = 0L;
    Thread prefetcher;
    final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue(10000);
    AtomicBoolean closed = new AtomicBoolean(false);
    static final byte PAYLOAD_BIT = 16;
    static final byte ADAPTION_BIT = 32;
    static final int SCRAMBLE_BIT = 128;
    static final int ADAPTION_DISCONTINUITY = 128;
    static final byte ADAPTION_RANDOM_ACCESS = 64;
    static final byte ADAPTION_ELEMENTARY_STREAM_PRIO = 32;
    static final byte ADAPTION_PCR = 16;
    static final byte ADAPTION_OPCR = 8;

    public TransportStream(SourceURL url) {
        super(url);
    }

    public void init() throws Exception {
        super.init();
        this.stream = this.getInputStream();
        this.prefetcher = new Thread(){

            @Override
            public void run() {
                while (!TransportStream.this.closed.get()) {
                    try {
                        int adaptionSize;
                        byte flags;
                        byte[] packet = new byte[188];
                        int read = 0;
                        while (!TransportStream.this.closed.get() && read < 188) {
                            int bytes = TransportStream.this.stream.read(packet, read, packet.length - read);
                            log.debug("Read {} bytes from stream...", (Object)bytes);
                            if (bytes <= 0) continue;
                            read += bytes;
                            TransportStream.this.offset += (long)bytes;
                        }
                        int mask = 31;
                        int pid = ((mask & packet[1]) << 8) + packet[2];
                        if (pid == 0) {
                            log.debug("packet {} has PID 0, offset is {}", (Object)TransportStream.this.sequenceId, (Object)TransportStream.this.offset);
                            log.debug("   total of {}k read", (Object)(TransportStream.this.offset / 1024L));
                        }
                        if (((flags = packet[3]) & 0x80) > 0) {
                            log.debug("scrambling found!");
                        }
                        if ((flags & 0x20) <= 0 || (adaptionSize = Byte.valueOf(packet[4]).intValue()) > 0) {
                            // empty if block
                        }
                        Data item = DataFactory.create();
                        item.put((Object)"packet:data", (Object)packet);
                        item.put((Object)"packet:pid", (Object)pid);
                        item.put((Object)"packet:offset", (Object)(TransportStream.this.offset - 188L));
                        Long l = TransportStream.this.sequenceId;
                        Long l2 = TransportStream.this.sequenceId = Long.valueOf(TransportStream.this.sequenceId + 1L);
                        item.put((Object)"packet:sequence", (Object)l);
                        item.put((Object)"packet:source:url", (Object)(TransportStream.this.getUrl() + ""));
                        while (TransportStream.this.queue.remainingCapacity() < 1) {
                            log.debug("Waiting for queue to drain a bit...");
                            Thread.sleep(100L);
                        }
                        TransportStream.this.queue.add(item);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Prefetcher thread finished.");
            }
        };
        this.prefetcher.setDaemon(true);
        this.prefetcher.start();
    }

    public void close() throws Exception {
        log.info("Closing TransportStream...");
        super.close();
        this.closed.set(true);
        while (this.prefetcher.isAlive()) {
            log.info("Waiting for pre-fetcher thread to finish...");
            try {
                this.prefetcher.interrupt();
                this.prefetcher.join();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public Data readNext() throws Exception {
        Data item = this.queue.take();
        return item;
    }

    public static void main(String[] args) throws Exception {
        TransportStream stream = new TransportStream(new SourceURL("http://cb00.virtual/rtl.ts"));
        stream.setLimit(100L);
        stream.init();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Data item = stream.read();
        int items = 0;
        long bytesWritten = 0L;
        while (item != null) {
            int pid = (Integer)item.get((Object)"packet:pid");
            if (pid == 0 && bytesWritten > 0L) {
                log.info("Closing old block...");
                baos.close();
                File file = new File("/Volumes/RamDisk/video-block-" + System.currentTimeMillis() + ".ts");
                FileOutputStream fos = new FileOutputStream(file);
                fos.write(baos.toByteArray());
                fos.close();
                log.info("Write TS chunk of size {} to {}", (Object)bytesWritten, (Object)file);
                baos = new ByteArrayOutputStream();
                bytesWritten = 0L;
            }
            if (baos != null) {
                byte[] data = (byte[])item.get((Object)"packet:data");
                baos.write(data);
                bytesWritten += (long)data.length;
            }
            ++items;
            item = stream.read();
        }
        if (baos != null) {
            baos.close();
        }
        log.info("{} items read.", (Object)items);
        stream.close();
    }
}

