/*
 * Decompiled with CFR 0.152.
 */
package org.calrissian.accumulorecipes.featurestore.impl;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.calrissian.accumulorecipes.commons.domain.Auths;
import org.calrissian.accumulorecipes.commons.domain.StoreConfig;
import org.calrissian.accumulorecipes.commons.support.Scanners;
import org.calrissian.accumulorecipes.commons.support.TimeUnit;
import org.calrissian.accumulorecipes.commons.support.TimestampUtil;
import org.calrissian.accumulorecipes.featurestore.FeatureStore;
import org.calrissian.accumulorecipes.featurestore.model.Feature;
import org.calrissian.accumulorecipes.featurestore.support.FeatureRegistry;
import org.calrissian.accumulorecipes.featurestore.support.FeatureTransform;
import org.calrissian.accumulorecipes.featurestore.support.Utilities;
import org.calrissian.accumulorecipes.featurestore.support.config.AccumuloFeatureConfig;
import org.calrissian.mango.collect.CloseableIterable;
import org.calrissian.mango.collect.CloseableIterables;

public class AccumuloFeatureStore
implements FeatureStore {
    public static final String DEFAULT_TABLE_NAME = "features";
    public static final String REVERSE_SUFFIX = "_reverse";
    protected static final StoreConfig DEFAULT_STORE_CONFIG = new StoreConfig(1, 100000L, 100L, 10);
    private final Connector connector;
    private final StoreConfig config;
    private final String tableName;
    private BatchWriter groupWriter;
    private BatchWriter typeWriter;
    private boolean isInitialized = false;
    protected final FeatureRegistry registry;

    public AccumuloFeatureStore(Connector connector) throws TableNotFoundException, TableExistsException, AccumuloSecurityException, AccumuloException {
        this(connector, DEFAULT_TABLE_NAME, DEFAULT_STORE_CONFIG, FeatureRegistry.BASE_FEATURES);
    }

    public AccumuloFeatureStore(Connector connector, String tableName, StoreConfig config, FeatureRegistry featureRegistry) throws TableNotFoundException, TableExistsException, AccumuloSecurityException, AccumuloException {
        Preconditions.checkNotNull((Object)connector);
        Preconditions.checkNotNull((Object)tableName);
        Preconditions.checkNotNull((Object)config);
        this.connector = connector;
        this.tableName = tableName;
        this.config = config;
        this.registry = featureRegistry;
    }

    public void initialize() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
        this.createTable(this.tableName);
        this.createTable(this.tableName + REVERSE_SUFFIX);
        this.groupWriter = this.connector.createBatchWriter(this.tableName, this.config.getMaxMemory(), this.config.getMaxLatency(), this.config.getMaxWriteThreads());
        this.typeWriter = this.connector.createBatchWriter(this.tableName + REVERSE_SUFFIX, this.config.getMaxMemory(), this.config.getMaxLatency(), this.config.getMaxWriteThreads());
        this.isInitialized = true;
    }

    private void createTable(String tableName) throws TableExistsException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
        int priority = 15;
        if (!this.connector.tableOperations().exists(tableName)) {
            this.connector.tableOperations().create(tableName, false);
            for (AccumuloFeatureConfig featureConfig : this.registry.getConfigs()) {
                List<IteratorSetting> settings = featureConfig.buildIterators(priority);
                int numSettings = 0;
                for (IteratorSetting setting : settings) {
                    ++numSettings;
                    this.connector.tableOperations().attachIterator(tableName, setting);
                }
                priority += numSettings;
            }
        }
    }

    protected ScannerBase metricScanner(AccumuloFeatureConfig xform, Date start, Date end, String group, Iterable<String> types, String name, TimeUnit timeUnit, Auths auths) {
        Preconditions.checkNotNull((Object)xform);
        try {
            group = StringUtils.defaultString((String)group);
            timeUnit = timeUnit == null ? TimeUnit.MINUTES : timeUnit;
            BatchScanner scanner = this.connector.createBatchScanner(this.tableName + REVERSE_SUFFIX, auths.getAuths(), this.config.getMaxQueryThreads());
            ArrayList<Range> typeRanges = new ArrayList<Range>();
            for (String type : types) {
                typeRanges.add(this.buildRange(type, start, end, timeUnit));
            }
            scanner.setRanges(typeRanges);
            scanner.fetchColumn(new Text(Utilities.combine(timeUnit.toString(), xform.featureName())), new Text(Utilities.combine(group, name)));
            return scanner;
        }
        catch (TableNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    protected ScannerBase metricScanner(AccumuloFeatureConfig xform, Date start, Date end, String group, String type, String name, TimeUnit timeUnit, Auths auths) {
        Preconditions.checkNotNull((Object)xform);
        try {
            group = StringUtils.defaultString((String)group);
            timeUnit = timeUnit == null ? TimeUnit.MINUTES : timeUnit;
            Scanner scanner = this.connector.createScanner(this.tableName + REVERSE_SUFFIX, auths.getAuths());
            scanner.setRange(this.buildRange(type, start, end, timeUnit));
            if (group != null && name != null) {
                scanner.fetchColumn(new Text(Utilities.combine(timeUnit.toString(), xform.featureName())), new Text(Utilities.combine(group, name)));
            } else {
                scanner.fetchColumnFamily(new Text(Utilities.combine(timeUnit.toString(), xform.featureName())));
                String cqRegex = null;
                if (group != null) {
                    cqRegex = Utilities.combine(group, "(.*)");
                } else if (name != null) {
                    cqRegex = Utilities.combine("(.*)", name);
                }
                if (cqRegex != null) {
                    IteratorSetting regexIterator = new IteratorSetting(14, "regex", RegExFilter.class);
                    scanner.addScanIterator(regexIterator);
                }
            }
            return scanner;
        }
        catch (TableNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    protected Range buildRange(String type, Date start, Date end, TimeUnit timeUnit) {
        return new Range((CharSequence)Utilities.combine(type, TimestampUtil.generateTimestamp((long)end.getTime(), (TimeUnit)timeUnit)), (CharSequence)Utilities.combine(type, TimestampUtil.generateTimestamp((long)start.getTime(), (TimeUnit)timeUnit)));
    }

    public void shutdown() throws MutationsRejectedException {
        this.groupWriter.close();
        this.typeWriter.close();
    }

    @Override
    public void save(Iterable<? extends Feature> featureData) {
        this.save(featureData, Arrays.asList(TimeUnit.values()));
    }

    @Override
    public void save(Iterable<? extends Feature> featureData, Iterable<TimeUnit> timeUnits) {
        if (!this.isInitialized) {
            throw new RuntimeException("Please called initialize() on the store first");
        }
        try {
            for (Feature feature : featureData) {
                AccumuloFeatureConfig featureConfig = this.registry.transformForClass(feature.getClass());
                if (featureConfig == null) {
                    throw new RuntimeException("Skipping unknown model type: " + feature.getClass());
                }
                String group = StringUtils.defaultString((String)feature.getGroup());
                String type = StringUtils.defaultString((String)feature.getType());
                String name = StringUtils.defaultString((String)feature.getName());
                ColumnVisibility visibility = new ColumnVisibility(StringUtils.defaultString((String)feature.getVisibility()));
                for (TimeUnit timeUnit : timeUnits) {
                    String timestamp = TimestampUtil.generateTimestamp((long)feature.getTimestamp(), (TimeUnit)timeUnit);
                    Mutation group_mutation = new Mutation((CharSequence)Utilities.combine(group, timestamp));
                    Mutation type_mutation = new Mutation((CharSequence)Utilities.combine(type, timestamp));
                    group_mutation.put((CharSequence)Utilities.combine(timeUnit.toString(), featureConfig.featureName()), (CharSequence)Utilities.combine(type, name), visibility, feature.getTimestamp(), featureConfig.buildValue(feature));
                    type_mutation.put((CharSequence)Utilities.combine(timeUnit.toString(), featureConfig.featureName()), (CharSequence)Utilities.combine(group, name), visibility, feature.getTimestamp(), featureConfig.buildValue(feature));
                    this.groupWriter.addMutation(group_mutation);
                    this.typeWriter.addMutation(type_mutation);
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void flush() throws Exception {
        this.groupWriter.flush();
        this.typeWriter.flush();
    }

    @Override
    public <T extends Feature> CloseableIterable<T> query(Date start, Date end, String group, String type, String name, TimeUnit timeUnit, Class<T> featureType, Auths auths) {
        if (!this.isInitialized) {
            throw new RuntimeException("Please call initialize() on the store first.");
        }
        AccumuloFeatureConfig featureConfig = this.registry.transformForClass(featureType);
        return CloseableIterables.transform((CloseableIterable)Scanners.closeableIterable((ScannerBase)this.metricScanner(featureConfig, start, end, group, type, name, timeUnit, auths)), this.buildFeatureTransform(featureConfig));
    }

    @Override
    public <T extends Feature> CloseableIterable<T> query(Date start, Date end, String group, Set<String> types, String name, TimeUnit timeUnit, Class<T> featureType, Auths auths) {
        if (!this.isInitialized) {
            throw new RuntimeException("Please call initialize() on the store first.");
        }
        AccumuloFeatureConfig featureConfig = this.registry.transformForClass(featureType);
        return CloseableIterables.transform((CloseableIterable)Scanners.closeableIterable((ScannerBase)this.metricScanner(featureConfig, start, end, group, types, name, timeUnit, auths)), this.buildFeatureTransform(featureConfig));
    }

    protected <T extends Feature> FeatureTransform<T> buildFeatureTransform(final AccumuloFeatureConfig xform) {
        return new FeatureTransform<T>(){

            @Override
            protected T transform(long timestamp, String group, String type, String name, String visibility, Value value) {
                return (Feature)xform.buildFeatureFromValue(timestamp, group, type, name, visibility, value);
            }
        };
    }
}

