/*
 * Decompiled with CFR 0.152.
 */
package org.calrissian.accumulorecipes.blobstore.impl;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.io.Text;
import org.calrissian.accumulorecipes.blobstore.BlobStore;
import org.calrissian.accumulorecipes.commons.domain.Auths;
import org.calrissian.accumulorecipes.commons.domain.StoreConfig;
import org.calrissian.mango.io.AbstractBufferedInputStream;
import org.calrissian.mango.io.AbstractBufferedOutputStream;
import org.calrissian.mango.types.LexiTypeEncoders;
import org.calrissian.mango.types.TypeEncoder;

public class AccumuloBlobStore
implements BlobStore {
    private static final TypeEncoder<Integer, String> encoder = LexiTypeEncoders.integerEncoder();
    private static final int DEFAULT_BUFFER_SIZE = 0x100000;
    private static final String DEFAULT_TABLE_NAME = "blobstore";
    private static final String DATA_CF = "DATA";
    protected final Connector connector;
    protected final String tableName;
    private final StoreConfig config;
    private final int bufferSize;

    public AccumuloBlobStore(Connector connector) throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
        this(connector, 0x100000);
    }

    public AccumuloBlobStore(Connector connector, String tableName, StoreConfig config) throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
        this(connector, tableName, config, 0x100000);
    }

    public AccumuloBlobStore(Connector connector, int bufferSize) throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
        this(connector, DEFAULT_TABLE_NAME, new StoreConfig(1, (long)(bufferSize * 100), 100L, 1), bufferSize);
    }

    public AccumuloBlobStore(Connector connector, String tableName, StoreConfig config, int bufferSize) throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
        Validate.notNull((Object)connector, (String)"Invalid connector");
        Validate.notEmpty((String)tableName, (String)"The table name must not be empty");
        Validate.notNull((Object)config, (String)"Invalid Config");
        Validate.isTrue((bufferSize > 0 ? 1 : 0) != 0, (String)"The buffer size must be greater than 0");
        this.connector = connector;
        this.tableName = tableName;
        this.config = config;
        this.bufferSize = bufferSize;
        if (!connector.tableOperations().exists(tableName)) {
            connector.tableOperations().create(tableName);
            this.configureTable(connector, tableName);
        }
    }

    protected static String generateRowId(String key, String type) {
        return StringUtils.defaultString((String)key) + "\u0000" + StringUtils.defaultString((String)type);
    }

    protected void configureTable(Connector connector, String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
    }

    protected BatchWriter getWriter() throws TableNotFoundException {
        return this.connector.createBatchWriter(this.tableName, this.config.getMaxMemory(), this.config.getMaxLatency(), this.config.getMaxWriteThreads());
    }

    protected Mutation generateMutation(String key, String type, byte[] data, int sequenceNum, long timestamp, ColumnVisibility visibility) {
        Mutation mutation = new Mutation((CharSequence)AccumuloBlobStore.generateRowId(key, type));
        mutation.put((CharSequence)DATA_CF, (CharSequence)encoder.encode((Object)sequenceNum), visibility, timestamp, new Value(data));
        return mutation;
    }

    protected OutputStream generateWriteStream(final BatchWriter writer, final String key, final String type, final long timestamp, String visibility) {
        final ColumnVisibility colVis = new ColumnVisibility(StringUtils.defaultString((String)visibility));
        return new AbstractBufferedOutputStream(this.bufferSize){
            int sequenceNum;
            {
                super(x0);
                this.sequenceNum = 0;
            }

            protected void writeBuffer(byte[] buf) throws IOException {
                if (buf.length == 0) {
                    return;
                }
                ++this.sequenceNum;
                try {
                    writer.addMutation(AccumuloBlobStore.this.generateMutation(key, type, buf, this.sequenceNum, timestamp, colVis));
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
            }

            public void flush() throws IOException {
                super.flush();
                try {
                    writer.flush();
                }
                catch (MutationsRejectedException e) {
                    throw new IOException(e);
                }
            }

            public void close() throws IOException {
                super.close();
                try {
                    writer.close();
                }
                catch (MutationsRejectedException e) {
                    throw new IOException(e);
                }
            }
        };
    }

    @Override
    public OutputStream store(String key, String type, long timestamp, String visibility) {
        try {
            return this.generateWriteStream(this.getWriter(), key, type, timestamp, visibility);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public InputStream get(String key, String type, Auths auths) {
        Validate.notNull((Object)auths, (String)"Null authorizations");
        try {
            String rowId = AccumuloBlobStore.generateRowId(key, type);
            Scanner scanner = this.connector.createScanner(this.tableName, auths.getAuths());
            scanner.setRange(Range.exact((CharSequence)rowId, (CharSequence)DATA_CF));
            scanner.fetchColumnFamily(new Text(DATA_CF));
            final Iterator iterator = scanner.iterator();
            return new AbstractBufferedInputStream(){

                protected boolean isEOF() {
                    return !iterator.hasNext();
                }

                protected byte[] getNextBuffer() throws IOException {
                    if (iterator.hasNext()) {
                        return ((Value)((Map.Entry)iterator.next()).getValue()).get();
                    }
                    return null;
                }
            };
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

