/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.contrib.bkjournal;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.contrib.bkjournal.EditLogLedgerMetadata;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.StreamLimiter;

class BookKeeperEditLogInputStream
extends EditLogInputStream {
    static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
    private final long firstTxId;
    private final long lastTxId;
    private final int logVersion;
    private final boolean inProgress;
    private final LedgerHandle lh;
    private final FSEditLogOp.Reader reader;
    private final FSEditLogLoader.PositionTrackingInputStream tracker;

    BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata) throws IOException {
        this(lh, metadata, 0L);
    }

    BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata, long firstBookKeeperEntry) throws IOException {
        this.lh = lh;
        this.firstTxId = metadata.getFirstTxId();
        this.lastTxId = metadata.getLastTxId();
        this.logVersion = metadata.getDataLayoutVersion();
        this.inProgress = metadata.isInProgress();
        if (firstBookKeeperEntry < 0L || firstBookKeeperEntry > lh.getLastAddConfirmed()) {
            throw new IOException("Invalid first bk entry to read: " + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
        }
        BufferedInputStream bin = new BufferedInputStream(new LedgerInputStream(lh, firstBookKeeperEntry));
        this.tracker = new FSEditLogLoader.PositionTrackingInputStream((InputStream)bin);
        DataInputStream in = new DataInputStream((InputStream)this.tracker);
        this.reader = new FSEditLogOp.Reader(in, (StreamLimiter)this.tracker, this.logVersion);
    }

    public long getFirstTxId() {
        return this.firstTxId;
    }

    public long getLastTxId() {
        return this.lastTxId;
    }

    public int getVersion(boolean verifyVersion) throws IOException {
        return this.logVersion;
    }

    protected FSEditLogOp nextOp() throws IOException {
        return this.reader.readOp(false);
    }

    public void close() throws IOException {
        try {
            this.lh.close();
        }
        catch (BKException e) {
            throw new IOException("Exception closing ledger", e);
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted closing ledger", e);
        }
    }

    public long getPosition() {
        return this.tracker.getPos();
    }

    public long length() throws IOException {
        return this.lh.getLength();
    }

    public String getName() {
        return String.format("BookKeeperLedger[ledgerId=%d,firstTxId=%d,lastTxId=%d]", this.lh.getId(), this.firstTxId, this.lastTxId);
    }

    public boolean isInProgress() {
        return this.inProgress;
    }

    public void skipTo(long txId) throws IOException {
        long numToSkip = this.getFirstTxId() - txId;
        FSEditLogOp op = null;
        for (long i = 0L; i < numToSkip; ++i) {
            op = this.readOp();
        }
        if (op != null && op.getTransactionId() != txId - 1L) {
            throw new IOException("Corrupt stream, expected txid " + (txId - 1L) + ", got " + op.getTransactionId());
        }
    }

    public String toString() {
        return "BookKeeperEditLogInputStream {" + this.getName() + "}";
    }

    public void setMaxOpSize(int maxOpSize) {
        this.reader.setMaxOpSize(maxOpSize);
    }

    public boolean isLocalLog() {
        return false;
    }

    private static class LedgerInputStream
    extends InputStream {
        private long readEntries;
        private InputStream entryStream = null;
        private final LedgerHandle lh;
        private final long maxEntry;

        LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry) throws IOException {
            this.lh = lh;
            this.readEntries = firstBookKeeperEntry;
            this.maxEntry = lh.getLastAddConfirmed();
        }

        private InputStream nextStream() throws IOException {
            try {
                if (this.readEntries > this.maxEntry) {
                    return null;
                }
                Enumeration entries = this.lh.readEntries(this.readEntries, this.readEntries);
                ++this.readEntries;
                if (entries.hasMoreElements()) {
                    LedgerEntry e = (LedgerEntry)entries.nextElement();
                    assert (!entries.hasMoreElements());
                    return e.getEntryInputStream();
                }
            }
            catch (BKException e) {
                throw new IOException("Error reading entries from bookkeeper", e);
            }
            catch (InterruptedException e) {
                throw new IOException("Interrupted reading entries from bookkeeper", e);
            }
            return null;
        }

        @Override
        public int read() throws IOException {
            byte[] b = new byte[1];
            if (this.read(b, 0, 1) != 1) {
                return -1;
            }
            return b[0];
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int read = 0;
            if (this.entryStream == null) {
                this.entryStream = this.nextStream();
                if (this.entryStream == null) {
                    return read;
                }
            }
            while (read < len) {
                int thisread = this.entryStream.read(b, off + read, len - read);
                if (thisread == -1) {
                    this.entryStream = this.nextStream();
                    if (this.entryStream != null) continue;
                    return read;
                }
                read += thisread;
            }
            return read;
        }
    }
}

