/*
 * Decompiled with CFR 0.152.
 */
package org.projectnessie.versioned.gc.actions;

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.projectnessie.server.store.TableCommitMetaStoreWorker;
import org.projectnessie.versioned.Serializer;
import org.projectnessie.versioned.StoreWorker;
import org.projectnessie.versioned.dynamodb.DynamoStore;
import org.projectnessie.versioned.dynamodb.DynamoStoreConfig;
import org.projectnessie.versioned.gc.AssetKeyConverter;
import org.projectnessie.versioned.gc.AssetKeySerializer;
import org.projectnessie.versioned.gc.IcebergAssetKey;
import org.projectnessie.versioned.gc.IcebergAssetKeyConverter;
import org.projectnessie.versioned.gc.IdentifyUnreferencedAssets;
import org.projectnessie.versioned.gc.ValueTypeFilter;
import org.projectnessie.versioned.gc.actions.GcActionUtils;
import org.projectnessie.versioned.gc.actions.GcActionsConfig;
import org.projectnessie.versioned.store.Store;
import org.projectnessie.versioned.tiered.gc.GcOptions;
import org.projectnessie.versioned.tiered.gc.IdentifyUnreferencedValues;
import scala.collection.Iterable;
import software.amazon.awssdk.regions.Region;

public class GcActions {
    public static final TableIdentifier DEFAULT_TABLE_IDENTIFIER = TableIdentifier.parse((String)"gc.identified_tables");
    private static final StructType SCHEMA = SparkSchemaUtil.convert((Schema)new Schema(Types.StructType.of((Types.NestedField[])new Types.NestedField[]{Types.NestedField.required((int)1, (String)"tableName", (Type)Types.StringType.get()), Types.NestedField.required((int)2, (String)"timestamp", (Type)Types.TimestampType.withZone()), Types.NestedField.required((int)3, (String)"asset", (Type)Types.BinaryType.get()), Types.NestedField.required((int)4, (String)"snapshotId", (Type)Types.StringType.get()), Types.NestedField.required((int)5, (String)"assetType", (Type)Types.StringType.get()), Types.NestedField.required((int)6, (String)"path", (Type)Types.StringType.get()), Types.NestedField.required((int)7, (String)"name", (Type)Types.StringType.get()), Types.NestedField.required((int)8, (String)"runid", (Type)Types.LongType.get())}).fields()));
    private final TableCommitMetaStoreWorker worker = new TableCommitMetaStoreWorker();
    private final Clock clock = Clock.systemUTC();
    private final SparkSession spark;
    private final AssetKeySerializer assetKeySerializer;
    private final IcebergAssetKeyConverter assetKeyConverter;
    private final GcActionsConfig actionsConfig;
    private final GcOptions gcConfig;
    private final TableIdentifier table;

    private GcActions(SparkSession spark, GcActionsConfig actionsConfig, GcOptions gcConfig, TableIdentifier table) {
        this.spark = spark;
        this.actionsConfig = actionsConfig;
        this.gcConfig = gcConfig;
        this.table = table;
        SparkConf conf = new SparkConf();
        conf.setAll((Iterable)spark.sessionState().conf().getAllConfs());
        Configuration hadoopConfig = spark.sessionState().newHadoopConf();
        SerializableConfiguration configuration = new SerializableConfiguration(hadoopConfig);
        this.assetKeySerializer = new AssetKeySerializer(configuration);
        this.assetKeyConverter = new IcebergAssetKeyConverter(configuration);
        this.createTable(table);
    }

    private SparkSession spark() {
        return this.spark;
    }

    public Dataset<Row> identifyUnreferencedAssets() throws AnalysisException {
        IdentifyUnreferencedValues values = new IdentifyUnreferencedValues((StoreWorker)this.worker, this.store(this.actionsConfig), this.spark(), this.gcConfig, this.clock);
        Dataset unreferencedValues = values.identify();
        IdentifyUnreferencedAssets assets = new IdentifyUnreferencedAssets((Serializer)this.worker.getValueSerializer(), (Serializer)this.assetKeySerializer, (AssetKeyConverter)this.assetKeyConverter, (FilterFunction)new ValueTypeFilter((Serializer)this.worker.getValueSerializer()), this.spark());
        Dataset unreferencedAssets = assets.identify(unreferencedValues);
        long currentRunId = GcActionUtils.getMaxRunId(this.spark, this.table.toString()) + 1L;
        return unreferencedAssets.map((MapFunction)new ConvertToTableFunction(this.assetKeySerializer), (Encoder)RowEncoder.apply((StructType)SCHEMA)).withColumn("runid", functions.lit((Object)currentRunId));
    }

    public void updateUnreferencedAssetTable(Dataset<Row> unreferencedAssets) {
        unreferencedAssets.repartition(new Column[]{unreferencedAssets.col("tableName")}).sortWithinPartitions(new Column[0]).write().format("iceberg").mode("append").save(this.table.toString());
    }

    static DynamoStore createStore(GcActionsConfig config) {
        return new DynamoStore((DynamoStoreConfig)DynamoStoreConfig.builder().endpoint(Optional.ofNullable(config.getDynamoEndpoint()).map(e -> {
            try {
                return new URI((String)e);
            }
            catch (URISyntaxException ex) {
                throw new RuntimeException(ex);
            }
        })).region(Region.of((String)config.getDynamoRegion())).build());
    }

    private Supplier<Store> store(GcActionsConfig config) {
        if (config.getStoreType() != GcActionsConfig.StoreType.DYNAMO) {
            throw new UnsupportedOperationException("Ony dynamo tiered store is supported");
        }
        DynamoStore store = GcActions.createStore(config);
        store.start();
        return () -> GcActions.lambda$store$1((Store)store);
    }

    private void createTable(TableIdentifier tableIdentifier) {
        CatalogPlugin catalog = this.spark.sessionState().catalogManager().currentCatalog();
        Identifier ident = Identifier.of((String[])tableIdentifier.namespace().levels(), (String)tableIdentifier.name());
        Transform[] partitions = Spark3Util.toTransforms((PartitionSpec)PartitionSpec.builderFor((Schema)SparkSchemaUtil.convert((StructType)SCHEMA)).identity("tableName").build());
        try {
            ((TableCatalog)catalog).createTable(ident, SCHEMA, partitions, (Map)ImmutableMap.of());
        }
        catch (TableAlreadyExistsException e) {
            try {
                if (!((TableCatalog)catalog).loadTable(ident).schema().equals((Object)SCHEMA)) {
                    throw new RuntimeException(String.format("Cannot create table %s. Table with different schema already exists", ident), e);
                }
            }
            catch (NoSuchTableException noSuchTableException) {}
        }
        catch (NoSuchNamespaceException e) {
            throw new RuntimeException(String.format("Cannot create table. Are you using a Nessie Catalog. Catalog is %s", catalog.getClass().getName()), e);
        }
    }

    private static /* synthetic */ Store lambda$store$1(Store store) {
        return store;
    }

    public static class Builder {
        private final SparkSession spark;
        private GcActionsConfig actionsConfig;
        private GcOptions gcOptions;
        private TableIdentifier table;

        public Builder(SparkSession spark) {
            this.spark = spark;
        }

        public Builder setActionsConfig(GcActionsConfig actionsConfig) {
            this.actionsConfig = actionsConfig;
            return this;
        }

        public Builder setGcOptions(GcOptions gcOptions) {
            this.gcOptions = gcOptions;
            return this;
        }

        public Builder setTable(TableIdentifier table) {
            this.table = table;
            return this;
        }

        public GcActions build() {
            return new GcActions(this.spark, this.actionsConfig, this.gcOptions, this.table);
        }
    }

    private static class ConvertToTableFunction
    implements MapFunction<IdentifyUnreferencedAssets.UnreferencedItem, Row> {
        private final AssetKeySerializer assetKeySerializer;

        private ConvertToTableFunction(AssetKeySerializer assetKeySerializer) {
            this.assetKeySerializer = assetKeySerializer;
        }

        public Row call(IdentifyUnreferencedAssets.UnreferencedItem value) throws Exception {
            IcebergAssetKey assetKey = (IcebergAssetKey)this.assetKeySerializer.fromBytes(ByteString.copyFrom((byte[])value.getAsset()));
            List key = value.getKey();
            long microTimestamp = value.getTimestamp();
            long secondTimestamp = TimeUnit.MICROSECONDS.toSeconds(microTimestamp);
            long nanos = microTimestamp * 1000L - secondTimestamp * 1000000000L;
            Timestamp timestamp = Timestamp.from(Instant.ofEpochSecond(secondTimestamp, nanos));
            return RowFactory.create((Object[])new Object[]{String.join((CharSequence)".", key), timestamp, value.getAsset(), assetKey.getSnapshotId(), assetKey.getType().toString(), assetKey.getPath(), String.join((CharSequence)".", assetKey.toReportableName())});
        }
    }
}

