/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.newts.persistence.cassandra;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.Batch;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Future;
import javax.inject.Inject;
import javax.inject.Named;
import org.opennms.newts.aggregate.IntervalGenerator;
import org.opennms.newts.aggregate.ResultProcessor;
import org.opennms.newts.api.Duration;
import org.opennms.newts.api.Measurement;
import org.opennms.newts.api.Resource;
import org.opennms.newts.api.Results;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.SampleProcessorService;
import org.opennms.newts.api.SampleRepository;
import org.opennms.newts.api.Timestamp;
import org.opennms.newts.api.ValueType;
import org.opennms.newts.api.query.ResultDescriptor;
import org.opennms.newts.cassandra.CassandraSession;
import org.opennms.newts.persistence.cassandra.ConcurrentResultWrapper;
import org.opennms.newts.persistence.cassandra.DriverAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraSampleRepository
implements SampleRepository {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSampleRepository.class);
    private final CassandraSession m_session;
    private final int m_ttl;
    private final SampleProcessorService m_processorService;
    private final PreparedStatement m_selectStatement;
    private final Timer m_sampleSelectTimer;
    private final Timer m_measurementSelectTimer;
    private final Timer m_insertTimer;
    private Duration m_resourceShard = Duration.seconds(600000L);

    @Inject
    public CassandraSampleRepository(CassandraSession session, @Named(value="samples.cassandra.time-to-live") int ttl, MetricRegistry registry, SampleProcessorService processorService) {
        this.m_session = Preconditions.checkNotNull(session, "session argument");
        Preconditions.checkArgument(ttl >= 0, "Negative Cassandra column TTL");
        this.m_ttl = ttl;
        Preconditions.checkNotNull(registry, "metric registry argument");
        this.m_processorService = processorService;
        Select select = QueryBuilder.select().from("samples");
        select.where(QueryBuilder.eq("partition", QueryBuilder.bindMarker("partition")));
        select.where(QueryBuilder.eq("resource", QueryBuilder.bindMarker("resource")));
        select.where(QueryBuilder.gte("collected_at", QueryBuilder.bindMarker("start")));
        select.where(QueryBuilder.lte("collected_at", QueryBuilder.bindMarker("end")));
        this.m_selectStatement = this.m_session.prepare(select);
        this.m_sampleSelectTimer = registry.timer(this.metricName("sample-select-timer"));
        this.m_measurementSelectTimer = registry.timer(this.metricName("measurement-select-timer"));
        this.m_insertTimer = registry.timer(this.metricName("insert-timer"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Results<Measurement> select(Resource resource, Optional<Timestamp> start, Optional<Timestamp> end, ResultDescriptor descriptor, Duration resolution) {
        Timer.Context context = this.m_measurementSelectTimer.time();
        this.validateSelect(start, end);
        Timestamp upper = end.isPresent() ? end.get() : Timestamp.now();
        Timestamp lower = start.isPresent() ? start.get() : upper.minus(Duration.seconds(86400L));
        LOG.debug("Querying database for resource {}, from {} to {}", resource, lower.minus(resolution), upper);
        DriverAdapter driverAdapter = new DriverAdapter(this.cassandraSelect(resource, lower.minus(resolution), upper), descriptor.getSourceNames());
        Results<Measurement> results = new ResultProcessor(resource, lower, upper, descriptor, resolution).process(driverAdapter);
        LOG.debug("{} results returned from database", (Object)driverAdapter.getResultCount());
        try {
            Results<Measurement> results2 = results;
            return results2;
        }
        finally {
            context.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Results<Sample> select(Resource resource, Optional<Timestamp> start, Optional<Timestamp> end) {
        Timer.Context context = this.m_sampleSelectTimer.time();
        this.validateSelect(start, end);
        Timestamp upper = end.isPresent() ? end.get() : Timestamp.now();
        Timestamp lower = start.isPresent() ? start.get() : upper.minus(Duration.seconds(86400L));
        LOG.debug("Querying database for resource {}, from {} to {}", resource, lower, upper);
        Results<Sample> samples = new Results<Sample>();
        DriverAdapter driverAdapter = new DriverAdapter(this.cassandraSelect(resource, lower, upper));
        for (Results.Row<Sample> row : driverAdapter) {
            samples.addRow(row);
        }
        LOG.debug("{} results returned from database", (Object)driverAdapter.getResultCount());
        try {
            Results<Sample> results = samples;
            return results;
        }
        finally {
            context.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void insert(Collection<Sample> samples) {
        Timer.Context context = this.m_insertTimer.time();
        Batch batch = QueryBuilder.unloggedBatch(new RegularStatement[0]);
        for (Sample m : samples) {
            batch.add(QueryBuilder.insertInto("samples").value("partition", m.getTimestamp().stepFloor(this.m_resourceShard).asSeconds()).value("resource", m.getResource().getId()).value("collected_at", m.getTimestamp().asMillis()).value("metric_name", m.getName()).value("value", ValueType.decompose(m.getValue())).value("attributes", m.getAttributes()).using(QueryBuilder.ttl(this.m_ttl)));
        }
        try {
            this.m_session.execute(batch);
            if (this.m_processorService != null) {
                this.m_processorService.submit(samples);
            }
        }
        finally {
            context.stop();
        }
    }

    private Iterator<Row> cassandraSelect(Resource resource, Timestamp start, Timestamp end) {
        ArrayList<Future<ResultSet>> futures = Lists.newArrayList();
        Timestamp lower = start.stepFloor(this.m_resourceShard);
        Timestamp upper = end.stepFloor(this.m_resourceShard);
        for (Timestamp partition : new IntervalGenerator(lower, upper, this.m_resourceShard)) {
            BoundStatement bindStatement = this.m_selectStatement.bind(new Object[0]);
            bindStatement.setInt("partition", (int)partition.asSeconds());
            bindStatement.setString("resource", resource.getId());
            bindStatement.setDate("start", start.asDate());
            bindStatement.setDate("end", end.asDate());
            futures.add(this.m_session.executeAsync(bindStatement));
        }
        return new ConcurrentResultWrapper(futures);
    }

    private void validateSelect(Optional<Timestamp> start, Optional<Timestamp> end) {
        if (start.isPresent() && end.isPresent() && start.get().gt(end.get())) {
            throw new IllegalArgumentException("start time must be less than end time");
        }
    }

    void setResourceShard(Duration resourceShard) {
        this.m_resourceShard = resourceShard;
    }

    private String metricName(String suffix) {
        return MetricRegistry.name("repository", suffix);
    }
}

