/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.infra.bq;

import com.google.api.client.util.Clock;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.RetryOption;
import com.google.cloud.TransportOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.ExtractJobConfiguration;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.LoadConfiguration;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import net.minidev.json.JSONObject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.pipecraft.infra.bq.BQQuery;
import org.pipecraft.infra.bq.BQQueryExecutionSummary;
import org.pipecraft.infra.bq.BQResultsIterator;
import org.pipecraft.infra.bq.ExportTerminationType;
import org.pipecraft.infra.bq.LoadTerminationType;
import org.pipecraft.infra.bq.QueryExecutionConfig;
import org.pipecraft.infra.bq.QueryTerminationType;
import org.pipecraft.infra.bq.TableExportConfig;
import org.pipecraft.infra.bq.TableLoadConfig;
import org.pipecraft.infra.bq.exceptions.BQException;
import org.pipecraft.infra.bq.exceptions.ClientResourcesBQException;
import org.pipecraft.infra.bq.exceptions.ClientTooManyRowsBQException;
import org.pipecraft.infra.bq.exceptions.IOTransientBQException;
import org.pipecraft.infra.bq.exceptions.InvalidExportBQException;
import org.pipecraft.infra.bq.exceptions.InvalidQueryBQException;
import org.pipecraft.infra.bq.exceptions.InvalidTableLoadBQException;
import org.pipecraft.infra.bq.exceptions.NonTransientBQException;
import org.pipecraft.infra.bq.exceptions.ServerResourcesBQException;
import org.pipecraft.infra.bq.exceptions.ServerTooManyRowsBQException;
import org.pipecraft.infra.bq.exceptions.TimeoutBQException;
import org.pipecraft.infra.bq.exceptions.TransientBQException;
import org.pipecraft.infra.bq.exceptions.UnavailableBQException;
import org.pipecraft.infra.concurrent.AbstractCheckedFuture;
import org.pipecraft.infra.concurrent.CheckedFuture;
import org.pipecraft.infra.concurrent.CheckedFutureTransformer;
import org.pipecraft.infra.io.FileReadOptions;
import org.pipecraft.infra.io.FileUtils;
import org.pipecraft.infra.monitoring.JsonMonitorable;
import org.pipecraft.infra.monitoring.JsonMonitorableWrapper;
import org.pipecraft.infra.monitoring.collectors.ActionStatsCollector;
import org.pipecraft.infra.monitoring.collectors.NonBlockingActionStatsCollector;
import org.pipecraft.infra.storage.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryConnector
implements JsonMonitorable {
    private static final Logger log = LoggerFactory.getLogger(BigQueryConnector.class);
    private static final DateTimeFormatter PARTITION_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd");
    private final ConcurrentHashMap<Class<? extends BQQuery<?, ?>>, ActionStatsCollector<QueryTerminationType>> stats = new ConcurrentHashMap();
    private final String projectId;
    private final BigQuery bigquery;
    private final Consumer<BQQueryExecutionSummary> observer;
    private final QueryExecutionConfig defaultExecConfig;
    private final ExecutorService providedExecutor;
    private final ListeningExecutorService ex;
    private final ActionStatsCollector<ExportTerminationType> exportStats = new NonBlockingActionStatsCollector(ExportTerminationType.class);
    private final ActionStatsCollector<LoadTerminationType> loadStats = new NonBlockingActionStatsCollector(LoadTerminationType.class);

    public BigQueryConnector(String projectId, long connTimeoutMs, long readTimeoutMs, QueryExecutionConfig defaultExecutionConfig, Consumer<BQQueryExecutionSummary> observer, ExecutorService ex) throws IOException {
        this.projectId = projectId;
        this.bigquery = this.buildService(connTimeoutMs, readTimeoutMs);
        this.defaultExecConfig = defaultExecutionConfig;
        this.observer = observer;
        this.providedExecutor = ex;
        this.ex = MoreExecutors.listeningDecorator((ExecutorService)this.providedExecutor);
    }

    private BigQuery buildService(long connTimeoutMs, long readTimeoutMs) throws IOException {
        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        return (BigQuery)((BigQueryOptions.Builder)((BigQueryOptions.Builder)BigQueryOptions.newBuilder().setProjectId(this.projectId)).setCredentials((Credentials)credentials)).setTransportOptions((TransportOptions)HttpTransportOptions.newBuilder().setConnectTimeout((int)connTimeoutMs).setReadTimeout((int)readTimeoutMs).build()).build().getService();
    }

    public String getProjectId() {
        return this.projectId;
    }

    public QueryExecutionConfig getDefaultQueryExecutionConfig() {
        return this.defaultExecConfig;
    }

    public ExecutorService getExecutorService() {
        return this.providedExecutor;
    }

    private <R, F> BQQueryResultFuture<R, F> executeAsync(BQQuery<R, F> query, QueryExecutionConfig config, boolean streamResults) {
        BQQueryResultFuture bqFuture;
        try {
            ListenableFuture listenableFuture = this.ex.submit(new QueryTask<R, F>(query, config, streamResults));
            bqFuture = new BQQueryResultFuture(listenableFuture);
        }
        catch (RejectedExecutionException e) {
            Class<?> cls = query.getClass();
            ActionStatsCollector<QueryTerminationType> actionStats = this.getStatsFor(cls);
            actionStats.startAndEnd((Enum)QueryTerminationType.FAILED_CLIENT_RESOURCES_LIMIT);
            bqFuture = new BQQueryResultFuture(Futures.immediateFailedFuture((Throwable)new ClientResourcesBQException()));
        }
        return bqFuture;
    }

    public <R, F> BQQueryResultFuture<R, F> executeAsync(BQQuery<R, F> query, QueryExecutionConfig config) {
        return this.executeAsync(query, config, true);
    }

    public <R, F> BQResultsIterator<R, F> execute(BQQuery<R, F> query, QueryExecutionConfig config) throws InterruptedException, BQException {
        return (BQResultsIterator)this.executeAsync(query, config).checkedGet();
    }

    public <R, F> BQQueryResultFuture<R, F> executeNoStreamingAsync(BQQuery<R, F> query, QueryExecutionConfig config) {
        return this.executeAsync(query, config, false);
    }

    public long executeNoStreaming(BQQuery<?, ?> query, QueryExecutionConfig config) throws InterruptedException, BQException {
        BQResultsIterator iterator = (BQResultsIterator)this.executeNoStreamingAsync(query, config).checkedGet();
        return iterator.totalRecordCount();
    }

    public <R, F> BQQueryResultFuture<R, F> executeAsync(BQQuery<R, F> query) {
        return this.executeAsync(query, this.defaultExecConfig, true);
    }

    public <R, F> BQResultsIterator<R, F> execute(BQQuery<R, F> query) throws InterruptedException, BQException {
        return (BQResultsIterator)this.executeAsync(query).checkedGet();
    }

    public CheckedFuture<Void, BQException> executeNoStreamingAsync(BQQuery<?, ?> query) {
        return new CheckedFutureTransformer(this.executeAsync(query, this.defaultExecConfig, false), s -> null);
    }

    public void executeNoStreaming(BQQuery<?, ?> query) throws BQException, InterruptedException {
        this.executeNoStreamingAsync(query).checkedGet();
    }

    private static String labelizeName(String str0) {
        String str = str0.toLowerCase().replaceAll("[^a-z0-9_]", "");
        if (str.length() > 63 || !str.equalsIgnoreCase(str0)) {
            if (str.length() > 58) {
                str = str.substring(0, 58);
            }
            CRC32 crc = new CRC32();
            crc.update(str0.getBytes());
            str = String.format("%s-%04x", str, crc.getValue() & 0xFFFFL);
        }
        return str;
    }

    private static void setJobConfigurationLabel(QueryJobConfiguration.Builder jobConfiguration, String label) {
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("name", label);
        map.put("lang", "java");
        jobConfiguration.setLabels(map);
    }

    public boolean tableExists(String dataset, String table) {
        return this.bigquery.getTable(TableId.of((String)this.projectId, (String)dataset, (String)table), new BigQuery.TableOption[0]) != null;
    }

    public void updateTableExpiration(String datasetId, String tableName, Integer duration, TimeUnit timeUnit) {
        Table table = this.bigquery.getTable(TableId.of((String)this.projectId, (String)datasetId, (String)tableName), new BigQuery.TableOption[0]);
        Long expirationTime = duration == null ? null : Long.valueOf(Clock.SYSTEM.currentTimeMillis() + timeUnit.toMillis(duration.intValue()));
        Table definition = table.toBuilder().setExpirationTime(expirationTime).build();
        this.bigquery.update((TableInfo)definition, new BigQuery.TableOption[0]);
    }

    public JSONObject getOwnMetrics() {
        return new JSONObject();
    }

    public Map<String, ? extends JsonMonitorable> getChildren() {
        HashMap<String, JsonMonitorable> queriesMap = new HashMap<String, JsonMonitorable>();
        for (Map.Entry<Class<BQQuery<?, ?>>, ActionStatsCollector<QueryTerminationType>> entry : this.stats.entrySet()) {
            String queryName = entry.getKey().getSimpleName();
            queriesMap.put(queryName, (JsonMonitorable)entry.getValue());
        }
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("queries", new JsonMonitorableWrapper(queriesMap));
        map.put("exports", this.exportStats);
        map.put("loads", this.loadStats);
        return map;
    }

    private static void logQueryEnd(long durationMicro, Throwable e, String jobId, String qType, Long bytesProcessed, Long totalRows) {
        long durationSec = durationMicro / 1000000L;
        String completion = e == null ? "completed" : "failed";
        String job = jobId == null ? "N/A" : jobId;
        String bytes = bytesProcessed == null ? "N/A" : bytesProcessed.toString();
        String rows = totalRows == null ? "N/A" : totalRows.toString();
        log.debug("Query '" + qType + "' (Job ID " + job + ") execution " + completion + ". Duration: " + durationSec + " sec, Processed: " + bytes + " bytes, Rows: " + rows, e);
    }

    private ActionStatsCollector<QueryTerminationType> getStatsFor(Class<? extends BQQuery<?, ?>> c) {
        return this.stats.computeIfAbsent(c, key -> new NonBlockingActionStatsCollector(QueryTerminationType.class));
    }

    public BQExportFuture exportTableAsync(TableExportConfig config) {
        BQExportFuture bqFuture;
        try {
            ListenableFuture listenableFuture = this.ex.submit((Callable)new ExportTask(config));
            bqFuture = new BQExportFuture((ListenableFuture<Void>)listenableFuture);
        }
        catch (RejectedExecutionException e) {
            this.exportStats.startAndEnd((Enum)ExportTerminationType.FAILED_CLIENT_RESOURCES_LIMIT);
            bqFuture = new BQExportFuture((ListenableFuture<Void>)Futures.immediateFailedFuture((Throwable)new ClientResourcesBQException()));
        }
        return bqFuture;
    }

    public void exportTable(TableExportConfig config) throws InterruptedException, BQException {
        this.exportTableAsync(config).checkedGet();
    }

    public BQTableLoadFuture loadTableAsync(TableLoadConfig tableLoadConfig) {
        BQTableLoadFuture bqFuture;
        try {
            ListenableFuture listenableFuture = this.ex.submit((Callable)new TableLoadTask(tableLoadConfig));
            bqFuture = new BQTableLoadFuture((ListenableFuture<Void>)listenableFuture);
        }
        catch (RejectedExecutionException e) {
            this.loadStats.startAndEnd((Enum)LoadTerminationType.FAILED_CLIENT_RESOURCES_LIMIT);
            bqFuture = new BQTableLoadFuture((ListenableFuture<Void>)Futures.immediateFailedFuture((Throwable)new ClientResourcesBQException()));
        }
        return bqFuture;
    }

    public void loadTable(TableLoadConfig tableLoadConfig) throws InterruptedException, BQException {
        this.loadTableAsync(tableLoadConfig).checkedGet();
    }

    private void validateJobStatus(Job job) {
        BigQueryError err = job.getStatus().getError();
        if (err != null) {
            String concatenatedMsgs = job.getStatus().getExecutionErrors().stream().map(BigQueryError::getMessage).collect(Collectors.joining("\n"));
            throw new BigQueryException(400, concatenatedMsgs, err);
        }
    }

    public static class BQTableLoadFuture
    extends AbstractCheckedFuture<Void, BQException> {
        public BQTableLoadFuture(ListenableFuture<Void> future) {
            super(future);
        }

        protected BQException map(Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            return (BQException)e;
        }
    }

    private class TableLoadTask
    implements Callable<Void> {
        private final TableLoadConfig loadConfig;

        public TableLoadTask(TableLoadConfig loadConfig) {
            this.loadConfig = loadConfig;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws BQException, InterruptedException {
            String jobIdStr;
            WriteChannelConfiguration.Builder bqLocalConfig;
            LoadTerminationType terminationType = LoadTerminationType.SUCCESS;
            BQException exc = null;
            Job job = null;
            log.debug("Executing table load: " + this.loadConfig);
            BigQueryConnector.this.loadStats.start();
            try {
                TableId destinationTable = this.loadConfig.getDestinationTableReference();
                if (this.loadConfig.isRemoteLoad()) {
                    LoadJobConfiguration.Builder bqConfig = LoadJobConfiguration.newBuilder((TableId)destinationTable, new ArrayList<String>(this.loadConfig.getSourceURIs())).setSchemaUpdateOptions(Collections.singletonList(JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION)).setCreateDisposition(this.loadConfig.getCreateDisposition()).setWriteDisposition(this.loadConfig.getWriteDisposition()).setDestinationTable(this.loadConfig.getDestinationTableReference()).setFormatOptions(this.getFormatOptions(this.loadConfig)).setJobTimeoutMs(this.loadConfig.getTimeoutMs()).setSchema(this.loadConfig.getTableSchema()).setSourceUris(new ArrayList<String>(this.loadConfig.getSourceURIs()));
                    this.setPartitionConfig((LoadConfiguration.Builder)bqConfig, this.loadConfig);
                    this.setClusteringConfig((LoadConfiguration.Builder)bqConfig, this.loadConfig);
                    job = BigQueryConnector.this.bigquery.create(JobInfo.of((JobConfiguration)bqConfig.build()), new BigQuery.JobOption[0]);
                } else {
                    bqLocalConfig = WriteChannelConfiguration.newBuilder((TableId)destinationTable).setSchemaUpdateOptions(Collections.singletonList(JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION)).setCreateDisposition(this.loadConfig.getCreateDisposition()).setWriteDisposition(this.loadConfig.getWriteDisposition()).setDestinationTable(this.loadConfig.getDestinationTableReference()).setFormatOptions(this.getFormatOptions(this.loadConfig)).setSchema(this.loadConfig.getTableSchema());
                    this.setPartitionConfig((LoadConfiguration.Builder)bqLocalConfig, this.loadConfig);
                    this.setClusteringConfig((LoadConfiguration.Builder)bqLocalConfig, this.loadConfig);
                    WriteChannelConfiguration writeConfig = bqLocalConfig.build();
                    JobId jobId = JobId.newBuilder().build();
                    try (TableDataWriteChannel writer = BigQueryConnector.this.bigquery.writer(jobId, writeConfig);){
                        this.writeFilesToWriter(writer);
                    }
                    job = writer.getJob();
                }
                job = job.waitFor(new RetryOption[0]);
                BigQueryConnector.this.validateJobStatus(job);
                BigQueryConnector.this.updateTableExpiration(destinationTable.getDataset(), destinationTable.getTable(), this.loadConfig.getDestinationTableExpirationHs(), TimeUnit.HOURS);
                bqLocalConfig = null;
                jobIdStr = job == null ? "N/A" : job.getJobId().getJob();
            }
            catch (Throwable e) {
                try {
                    Pair<BQException, LoadTerminationType> errDetails = this.mapLoadException(e, this.loadConfig);
                    exc = (BQException)errDetails.getLeft();
                    terminationType = (LoadTerminationType)((Object)errDetails.getRight());
                    if (terminationType == LoadTerminationType.INTERNAL_ERROR) {
                        throw new RuntimeException(exc);
                    }
                    throw exc;
                }
                catch (Throwable throwable) {
                    String jobIdStr2 = job == null ? "N/A" : job.getJobId().getJob();
                    long durationMicro = BigQueryConnector.this.loadStats.end((Enum)terminationType);
                    log.debug("Done table load (job=" + jobIdStr2 + ") " + (exc == null ? "without" : "with") + " error. Duration: " + durationMicro / 1000L + " ms");
                    throw throwable;
                }
            }
            long durationMicro = BigQueryConnector.this.loadStats.end((Enum)terminationType);
            log.debug("Done table load (job=" + jobIdStr + ") " + (exc == null ? "without" : "with") + " error. Duration: " + durationMicro / 1000L + " ms");
            return bqLocalConfig;
        }

        private void setPartitionConfig(LoadConfiguration.Builder bqConfig, TableLoadConfig loadConfig) {
            LocalDate date = loadConfig.getDestinationTablePartition();
            if (date != null) {
                TableId tableId = loadConfig.getDestinationTableReference();
                tableId = TableId.of((String)tableId.getDataset(), (String)(tableId.getTable() + "$" + PARTITION_DATE_FORMAT.format(date)));
                bqConfig.setDestinationTable(tableId);
                bqConfig.setTimePartitioning(TimePartitioning.of((TimePartitioning.Type)TimePartitioning.Type.DAY));
            }
        }

        private void setClusteringConfig(LoadConfiguration.Builder bqConfig, TableLoadConfig loadConfig) {
            Set<String> fields = loadConfig.getClusteringFields();
            if (fields != null && !fields.isEmpty()) {
                bqConfig.setClustering(Clustering.newBuilder().setFields(new ArrayList<String>(fields)).build());
            }
        }

        private FormatOptions getFormatOptions(TableLoadConfig loadConfig) {
            TableLoadConfig.LoadFormat format = loadConfig.getLoadFormat();
            if (format == TableLoadConfig.LoadFormat.CSV) {
                return CsvOptions.newBuilder().setFieldDelimiter(loadConfig.getCsvFieldDelimiter()).setAllowJaggedRows(loadConfig.getAllowJaggedRows()).setSkipLeadingRows(loadConfig.getCSVHasHeader() ? 1L : 0L).build();
            }
            return FormatOptions.of((String)loadConfig.getLoadFormat().name());
        }

        private void writeFilesToWriter(TableDataWriteChannel writer) throws IOException {
            File[] files = null;
            try (OutputStream os = Channels.newOutputStream((WritableByteChannel)writer);){
                for (String localPath : this.loadConfig.getSourceURIs()) {
                    String filenamePart = PathUtils.getLastPathPart((String)localPath);
                    String parentPath = PathUtils.getParentPath((String)localPath);
                    WildcardFileFilter fileFilter = new WildcardFileFilter(filenamePart);
                    for (File f : files = new File(parentPath).listFiles((FileFilter)fileFilter)) {
                        try (BufferedInputStream is = FileUtils.getInputStream((File)f, (FileReadOptions)new FileReadOptions().detectCompression(filenamePart));){
                            IOUtils.copy((InputStream)is, (OutputStream)os);
                        }
                    }
                    if (files != null && files.length != 0) continue;
                    throw new FileNotFoundException();
                }
            }
        }

        private Pair<BQException, LoadTerminationType> mapLoadException(Throwable e, TableLoadConfig config) {
            if (e == null) {
                return new ImmutablePair(null, (Object)LoadTerminationType.SUCCESS);
            }
            if (e.getCause() instanceof SocketTimeoutException) {
                return new ImmutablePair((Object)new TimeoutBQException("Table load timed out.", e.getCause()), (Object)LoadTerminationType.FAILED_TIMEOUT);
            }
            String reason = null;
            if (e instanceof JobException) {
                reason = ((BigQueryError)((JobException)e).getErrors().get(0)).getReason();
            } else if (e instanceof BigQueryException) {
                reason = ((BigQueryException)e).getReason();
            }
            if (reason != null) {
                switch (reason) {
                    case "notFound": 
                    case "invalid": {
                        return new ImmutablePair((Object)new InvalidTableLoadBQException("Illegal table load request: " + e.getMessage(), config), (Object)LoadTerminationType.FAILED_INVALID_REQUEST);
                    }
                    case "backendError": 
                    case "internalError": {
                        return new ImmutablePair((Object)new UnavailableBQException("BQ server side error", e), (Object)LoadTerminationType.FAILED_SERVER_ERROR);
                    }
                    case "stopped": {
                        return new ImmutablePair((Object)new TimeoutBQException("Table load timed out. Job stopped.", e.getCause()), (Object)LoadTerminationType.FAILED_TIMEOUT);
                    }
                }
            }
            if (e instanceof FileNotFoundException) {
                return new ImmutablePair((Object)new InvalidTableLoadBQException("Local load with no matching files", config), (Object)LoadTerminationType.FAILED_OTHER);
            }
            if (e instanceof IOException) {
                return new ImmutablePair((Object)new IOTransientBQException("IO error", e), (Object)LoadTerminationType.FAILED_OTHER);
            }
            if (e instanceof RuntimeException || e instanceof Error) {
                return new ImmutablePair((Object)new NonTransientBQException("Internal error", e), (Object)LoadTerminationType.INTERNAL_ERROR);
            }
            return new ImmutablePair((Object)new TransientBQException("Unspecified error", e), (Object)LoadTerminationType.FAILED_OTHER);
        }
    }

    public static class BQExportFuture
    extends AbstractCheckedFuture<Void, BQException> {
        public BQExportFuture(ListenableFuture<Void> future) {
            super(future);
        }

        protected BQException map(Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            return (BQException)e;
        }
    }

    private class ExportTask
    implements Callable<Void> {
        private final TableExportConfig exportConfig;

        public ExportTask(TableExportConfig exportConfig) {
            this.exportConfig = exportConfig;
        }

        @Override
        public Void call() throws BQException, InterruptedException {
            String jobIdStr;
            Void void_;
            ExportTerminationType terminationType = ExportTerminationType.SUCCESS;
            BQException exc = null;
            Job job = null;
            log.debug("Executing export: " + this.exportConfig);
            BigQueryConnector.this.exportStats.start();
            try {
                ExtractJobConfiguration config = ExtractJobConfiguration.newBuilder((TableId)this.exportConfig.getSourceTableReference(), new ArrayList<String>(this.exportConfig.getDestinationURIs())).setCompression(this.exportConfig.getCompression().name()).setFieldDelimiter(String.valueOf(this.exportConfig.getCSVFieldDelimiter())).setFormat(this.exportConfig.getExportFormat().name()).setJobTimeoutMs(this.exportConfig.getTimeoutMs()).setPrintHeader(Boolean.valueOf(this.exportConfig.isPrintHeader())).build();
                job = BigQueryConnector.this.bigquery.create(JobInfo.of((JobConfiguration)config), new BigQuery.JobOption[0]);
                job = job.waitFor(new RetryOption[0]);
                BigQueryConnector.this.validateJobStatus(job);
                void_ = null;
                jobIdStr = job == null ? "N/A" : job.getJobId().getJob();
            }
            catch (Throwable e) {
                try {
                    Pair<BQException, ExportTerminationType> errDetails = this.mapExportException(e, this.exportConfig);
                    exc = (BQException)errDetails.getLeft();
                    terminationType = (ExportTerminationType)((Object)errDetails.getRight());
                    if (terminationType == ExportTerminationType.INTERNAL_ERROR) {
                        throw new RuntimeException(exc);
                    }
                    throw exc;
                }
                catch (Throwable throwable) {
                    String jobIdStr2 = job == null ? "N/A" : job.getJobId().getJob();
                    long durationMicro = BigQueryConnector.this.exportStats.end((Enum)terminationType);
                    log.debug("Done export (job=" + jobIdStr2 + ") " + (exc == null ? "without" : "with") + " error. Duration: " + durationMicro / 1000L + " ms");
                    throw throwable;
                }
            }
            long durationMicro = BigQueryConnector.this.exportStats.end((Enum)terminationType);
            log.debug("Done export (job=" + jobIdStr + ") " + (exc == null ? "without" : "with") + " error. Duration: " + durationMicro / 1000L + " ms");
            return void_;
        }

        private Pair<BQException, ExportTerminationType> mapExportException(Throwable e, TableExportConfig config) {
            if (e == null) {
                return new ImmutablePair(null, (Object)ExportTerminationType.SUCCESS);
            }
            if (e.getCause() instanceof SocketTimeoutException) {
                return new ImmutablePair((Object)new TimeoutBQException("Export timed out.", e.getCause()), (Object)ExportTerminationType.FAILED_TIMEOUT);
            }
            String reason = null;
            if (e instanceof JobException) {
                reason = ((BigQueryError)((JobException)e).getErrors().get(0)).getReason();
            } else if (e instanceof BigQueryException) {
                reason = ((BigQueryException)e).getReason();
            }
            if (reason != null) {
                switch (reason) {
                    case "notFound": 
                    case "invalid": {
                        return new ImmutablePair((Object)new InvalidExportBQException("Illegal export request: " + e.getMessage(), config), (Object)ExportTerminationType.FAILED_INVALID_REQUEST);
                    }
                    case "backendError": 
                    case "internalError": {
                        return new ImmutablePair((Object)new UnavailableBQException("BQ server side error", e), (Object)ExportTerminationType.FAILED_SERVER_ERROR);
                    }
                    case "stopped": {
                        return new ImmutablePair((Object)new TimeoutBQException("Export timed out. Job stopped.", e.getCause()), (Object)ExportTerminationType.FAILED_TIMEOUT);
                    }
                }
            }
            if (e instanceof RuntimeException || e instanceof Error) {
                return new ImmutablePair((Object)new NonTransientBQException("Internal error", e), (Object)ExportTerminationType.INTERNAL_ERROR);
            }
            return new ImmutablePair((Object)new TransientBQException("Unspecified error", e), (Object)ExportTerminationType.FAILED_OTHER);
        }
    }

    public static class BQQueryResultFuture<R, F>
    extends AbstractCheckedFuture<BQResultsIterator<R, F>, BQException> {
        public BQQueryResultFuture(ListenableFuture<BQResultsIterator<R, F>> future) {
            super(future);
        }

        protected BQException map(Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            return (BQException)e;
        }
    }

    private class QueryTask<R, F>
    implements Callable<BQResultsIterator<R, F>> {
        private final BQQuery<R, F> query;
        private final QueryExecutionConfig config;
        private final boolean streamResults;

        public QueryTask(BQQuery<R, F> query, QueryExecutionConfig config, boolean streamResults) {
            this.query = query;
            this.config = config;
            this.streamResults = streamResults;
        }

        @Override
        public BQResultsIterator<R, F> call() throws BQException, InterruptedException {
            String jobIdStr;
            BQResultsIterator<R, F> bQResultsIterator;
            Class<?> qClass = this.query.getClass();
            String qClassName = qClass.getSimpleName();
            ActionStatsCollector<QueryTerminationType> qStats = BigQueryConnector.this.getStatsFor(qClass);
            qStats.start();
            long startTime = System.currentTimeMillis();
            Boolean isCacheHit = null;
            Long totalRows = null;
            Job job = null;
            Long bytes = null;
            String sql = null;
            BQException exc = null;
            boolean isBQSuccess = false;
            QueryTerminationType terminationType = QueryTerminationType.SUCCESS;
            try {
                sql = this.query.getSQL();
                log.debug("Executing '" + qClassName + "': " + sql);
                TableId tableRef = this.config.getDestinationTableReference();
                QueryJobConfiguration queryJobConfig = this.buildQueryJobConfig(sql, tableRef, this.config);
                job = BigQueryConnector.this.bigquery.create(JobInfo.of((JobConfiguration)queryJobConfig), new BigQuery.JobOption[0]);
                TableResult res = job.getQueryResults(new BigQuery.QueryResultsOption[]{BigQuery.QueryResultsOption.pageSize((long)this.config.getMaxRowsPerPage())});
                if (tableRef != null && this.config.getDestinationTableExpirationHs() != null) {
                    BigQueryConnector.this.updateTableExpiration(tableRef.getDataset(), tableRef.getTable(), this.config.getDestinationTableExpirationHs(), TimeUnit.HOURS);
                }
                job = BigQueryConnector.this.bigquery.getJob(job.getJobId(), new BigQuery.JobOption[]{BigQuery.JobOption.fields((BigQuery.JobField[])new BigQuery.JobField[]{BigQuery.JobField.STATISTICS})});
                JobStatistics.QueryStatistics qstats = (JobStatistics.QueryStatistics)job.getStatistics();
                bytes = qstats.getTotalBytesProcessed();
                isCacheHit = qstats.getCacheHit();
                totalRows = res.getTotalRows();
                isBQSuccess = true;
                if (this.config.getMaxResults() != null && totalRows > this.config.getMaxResults()) {
                    throw new ClientTooManyRowsBQException("Max rows limit (soft limit) exceeded. Maximum is " + this.config.getMaxResults() + ", but the result has " + totalRows + " rows.", this.query);
                }
                Iterator it = this.streamResults ? res.iterateAll().iterator() : Collections.emptyIterator();
                bQResultsIterator = new BQResultsIterator<R, F>(this.query, Iterators.transform(it, this.query::mapRow), totalRows);
                jobIdStr = job == null ? "N/A" : job.getJobId().getJob();
            }
            catch (Throwable e) {
                try {
                    Pair<BQException, QueryTerminationType> errDetails = this.mapQueryException(e, this.query);
                    exc = (BQException)errDetails.getLeft();
                    terminationType = (QueryTerminationType)((Object)errDetails.getRight());
                    if (terminationType == QueryTerminationType.INTERNAL_ERROR) {
                        throw new RuntimeException(exc);
                    }
                    throw exc;
                }
                catch (Throwable throwable) {
                    String jobIdStr2 = job == null ? "N/A" : job.getJobId().getJob();
                    long durationMicro = qStats.end((Enum)terminationType);
                    BigQueryConnector.logQueryEnd(durationMicro, exc, jobIdStr2, qClassName, bytes, totalRows);
                    if (BigQueryConnector.this.observer != null) {
                        BigQueryConnector.this.observer.accept(new BQQueryExecutionSummary(startTime, jobIdStr2, bytes, isCacheHit, totalRows, qClassName, sql, (float)durationMicro / 1000000.0f, isBQSuccess));
                    }
                    throw throwable;
                }
            }
            long durationMicro = qStats.end((Enum)terminationType);
            BigQueryConnector.logQueryEnd(durationMicro, exc, jobIdStr, qClassName, bytes, totalRows);
            if (BigQueryConnector.this.observer != null) {
                BigQueryConnector.this.observer.accept(new BQQueryExecutionSummary(startTime, jobIdStr, bytes, isCacheHit, totalRows, qClassName, sql, (float)durationMicro / 1000000.0f, isBQSuccess));
            }
            return bQResultsIterator;
        }

        private Pair<BQException, QueryTerminationType> mapQueryException(Throwable e, BQQuery<?, ?> query) {
            if (e == null) {
                return new ImmutablePair(null, (Object)QueryTerminationType.SUCCESS);
            }
            if (e instanceof ClientTooManyRowsBQException) {
                return new ImmutablePair((Object)((BQException)e), (Object)QueryTerminationType.FAILED_CLIENT_TOO_MANY_ROWS);
            }
            if (e.getCause() instanceof SocketTimeoutException) {
                return new ImmutablePair((Object)new TimeoutBQException("Query timed out.", query), (Object)QueryTerminationType.FAILED_TIMEOUT);
            }
            String reason = null;
            if (e instanceof JobException) {
                reason = ((BigQueryError)((JobException)e).getErrors().get(0)).getReason();
            } else if (e instanceof BigQueryException) {
                reason = ((BigQueryException)e).getReason();
            }
            if (reason != null) {
                switch (reason) {
                    case "responseTooLarge": {
                        return new ImmutablePair((Object)new ServerTooManyRowsBQException(e, query), (Object)QueryTerminationType.FAILED_SERVER_TOO_MANY_ROWS);
                    }
                    case "notFound": 
                    case "invalid": 
                    case "invalidQuery": {
                        return new ImmutablePair((Object)new InvalidQueryBQException("Illegal query: " + e.getMessage(), query), (Object)QueryTerminationType.FAILED_INVALID_QUERY);
                    }
                    case "resourcesExceeded": {
                        return new ImmutablePair((Object)new ServerResourcesBQException(e, query), (Object)QueryTerminationType.FAILED_SERVER_RESOURCES_LIMIT);
                    }
                    case "backendError": 
                    case "internalError": {
                        return new ImmutablePair((Object)new UnavailableBQException("BQ server side error", e), (Object)QueryTerminationType.FAILED_SERVER_ERROR);
                    }
                    case "stopped": {
                        return new ImmutablePair((Object)new TimeoutBQException("Query timed out. Job stopped.", query), (Object)QueryTerminationType.FAILED_TIMEOUT);
                    }
                }
            }
            if (e instanceof RuntimeException || e instanceof Error) {
                return new ImmutablePair((Object)new NonTransientBQException("Internal error", e), (Object)QueryTerminationType.INTERNAL_ERROR);
            }
            return new ImmutablePair((Object)new TransientBQException("Unspecified error", e), (Object)QueryTerminationType.FAILED_OTHER);
        }

        private QueryJobConfiguration buildQueryJobConfig(String sql, TableId destinationTableRef, QueryExecutionConfig config) {
            QueryJobConfiguration.Builder queryConfigBuilder = QueryJobConfiguration.newBuilder((String)sql).setUseQueryCache(Boolean.valueOf(!config.isCacheDisabled())).setUseLegacySql(Boolean.valueOf(this.query.isLegacySQL())).setPriority(config.getPriority()).setJobTimeoutMs(config.getTimeoutMs());
            Map<String, QueryParameterValue> queryParameters = this.query.getQueryParameters();
            if (queryParameters != null) {
                queryConfigBuilder.setNamedParameters(queryParameters);
            }
            if (destinationTableRef != null) {
                queryConfigBuilder.setDestinationTable(destinationTableRef).setAllowLargeResults(Boolean.valueOf(true)).setWriteDisposition(config.getWriteDisposition());
                if (log.isDebugEnabled()) {
                    log.debug("query results are saved to: " + destinationTableRef.getDataset() + "." + destinationTableRef.getTable() + " table.");
                }
            }
            BigQueryConnector.setJobConfigurationLabel(queryConfigBuilder, BigQueryConnector.labelizeName(this.query.getClass().getSimpleName()));
            return queryConfigBuilder.build();
        }
    }
}

