/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.s3.client.aws;

import jakarta.annotation.Nullable;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetbrains.annotations.ApiStatus;
import org.reactivestreams.Publisher;
import reactor.adapter.JdkFlowAdapter;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.s3.client.S3DeleteException;
import ru.tinkoff.kora.s3.client.S3Exception;
import ru.tinkoff.kora.s3.client.S3KoraAsyncClient;
import ru.tinkoff.kora.s3.client.S3NotFoundException;
import ru.tinkoff.kora.s3.client.aws.AwsS3ClientConfig;
import ru.tinkoff.kora.s3.client.aws.AwsS3Object;
import ru.tinkoff.kora.s3.client.aws.AwsS3ObjectList;
import ru.tinkoff.kora.s3.client.aws.AwsS3ObjectMeta;
import ru.tinkoff.kora.s3.client.aws.AwsS3ObjectMetaList;
import ru.tinkoff.kora.s3.client.aws.AwsS3ObjectUpload;
import ru.tinkoff.kora.s3.client.model.ByteS3Body;
import ru.tinkoff.kora.s3.client.model.PublisherS3Body;
import ru.tinkoff.kora.s3.client.model.S3Body;
import ru.tinkoff.kora.s3.client.model.S3Object;
import ru.tinkoff.kora.s3.client.model.S3ObjectList;
import ru.tinkoff.kora.s3.client.model.S3ObjectMeta;
import ru.tinkoff.kora.s3.client.model.S3ObjectMetaList;
import ru.tinkoff.kora.s3.client.model.S3ObjectUpload;
import ru.tinkoff.kora.s3.client.telemetry.S3KoraClientTelemetry;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.ResponsePublisher;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration;

@ApiStatus.Experimental
public class AwsS3KoraAsyncClient
implements S3KoraAsyncClient {
    private final S3AsyncClient asyncClient;
    private final S3AsyncClient multipartAsyncClient;
    private final ExecutorService awsExecutor;
    private final S3KoraClientTelemetry telemetry;
    private final AwsS3ClientConfig awsS3ClientConfig;

    public AwsS3KoraAsyncClient(S3AsyncClient asyncClient, ExecutorService awsExecutor, S3KoraClientTelemetry telemetry, AwsS3ClientConfig awsS3ClientConfig) {
        this.asyncClient = asyncClient;
        this.awsExecutor = awsExecutor;
        this.telemetry = telemetry;
        this.awsS3ClientConfig = awsS3ClientConfig;
        this.multipartAsyncClient = MultipartS3AsyncClient.create((S3AsyncClient)asyncClient, (MultipartConfiguration)((MultipartConfiguration)MultipartConfiguration.builder().thresholdInBytes(Long.valueOf(awsS3ClientConfig.upload().partSize().toBytes())).apiCallBufferSizeInBytes(Long.valueOf(awsS3ClientConfig.upload().bufferSize().toBytes())).minimumPartSizeInBytes(Long.valueOf(awsS3ClientConfig.upload().partSize().toBytes())).build()));
    }

    private CompletionStage<S3Object> getInternal(String bucket, String key) {
        GetObjectRequest request = (GetObjectRequest)GetObjectRequest.builder().bucket(bucket).key(key).build();
        return this.asyncClient.getObject(request, AsyncResponseTransformer.toPublisher()).thenApply(r -> new AwsS3Object(request.key(), (ResponsePublisher<GetObjectResponse>)r));
    }

    public CompletionStage<S3Object> get(String bucket, String key) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry(this.getInternal(bucket, key), () -> this.telemetry.get("GetObject", bucket, key, null));
    }

    private CompletionStage<S3ObjectMeta> getMetaInternal(String bucket, String key) {
        GetObjectAttributesRequest request = (GetObjectAttributesRequest)GetObjectAttributesRequest.builder().bucket(bucket).key(key).objectAttributes(new ObjectAttributes[]{ObjectAttributes.OBJECT_SIZE}).build();
        return this.asyncClient.getObjectAttributes(request).thenApply(r -> new AwsS3ObjectMeta(key, (GetObjectAttributesResponse)r));
    }

    public CompletionStage<S3ObjectMeta> getMeta(String bucket, String key) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry(this.getMetaInternal(bucket, key), () -> this.telemetry.get("GetObjectMeta", bucket, key, null));
    }

    public CompletionStage<List<S3Object>> get(String bucket, Collection<String> keys) {
        CompletableFuture[] futures = (CompletableFuture[])keys.stream().map(k -> this.getInternal(bucket, (String)k).toCompletableFuture()).toArray(CompletableFuture[]::new);
        CompletionStage operation = ((CompletableFuture)CompletableFuture.allOf(futures).thenApply(_v -> Arrays.stream(futures).map(f -> (S3Object)f.join()).toList())).exceptionallyCompose(AwsS3KoraAsyncClient::handleExceptionStage);
        return AwsS3KoraAsyncClient.wrapWithTelemetry(operation, () -> this.telemetry.get("GetObjects", bucket, null, null));
    }

    public CompletionStage<List<S3ObjectMeta>> getMeta(String bucket, Collection<String> keys) {
        CompletableFuture[] futures = (CompletableFuture[])keys.stream().map(k -> this.getMetaInternal(bucket, (String)k).toCompletableFuture()).toArray(CompletableFuture[]::new);
        CompletionStage operation = ((CompletableFuture)CompletableFuture.allOf(futures).thenApply(_v -> Arrays.stream(futures).map(f -> (S3ObjectMeta)f.join()).toList())).exceptionallyCompose(AwsS3KoraAsyncClient::handleExceptionStage);
        return AwsS3KoraAsyncClient.wrapWithTelemetry(operation, () -> this.telemetry.get("GetObjectMetas", bucket, null, null));
    }

    public CompletionStage<S3ObjectList> list(String bucket, String prefix, @Nullable String delimiter, int limit) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry((Context fork) -> this.listInternal(bucket, prefix, delimiter, limit, (Context)fork), () -> this.telemetry.get("ListObjects", bucket, prefix, null));
    }

    private CompletionStage<S3ObjectList> listInternal(String bucket, String prefix, @Nullable String delimiter, int limit, Context context) {
        return this.listMetaInternal(bucket, prefix, delimiter, limit).thenCompose(metaList -> {
            try {
                context.inject();
                CompletableFuture[] futures = (CompletableFuture[])metaList.metas().stream().map(meta -> this.getInternal(bucket, meta.key()).toCompletableFuture()).toArray(CompletableFuture[]::new);
                CompletionStage completionStage = CompletableFuture.allOf(futures).thenApply(_v -> {
                    ArrayList<S3Object> objects = new ArrayList<S3Object>(futures.length);
                    for (CompletableFuture future : futures) {
                        objects.add((S3Object)future.join());
                    }
                    return new AwsS3ObjectList(((AwsS3ObjectMetaList)metaList).response(), objects);
                });
                return completionStage;
            }
            finally {
                Context.clear();
            }
        });
    }

    private CompletionStage<S3ObjectMetaList> listMetaInternal(String bucket, String prefix, @Nullable String delimiter, int limit) {
        ListObjectsV2Request request = (ListObjectsV2Request)ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).maxKeys(Integer.valueOf(limit)).delimiter(delimiter).build();
        return this.asyncClient.listObjectsV2(request).thenApply(response -> new AwsS3ObjectMetaList((ListObjectsV2Response)response));
    }

    public CompletionStage<S3ObjectMetaList> listMeta(String bucket, String prefix, @Nullable String delimiter, int limit) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry(this.listMetaInternal(bucket, prefix, delimiter, limit), () -> this.telemetry.get("ListObjectMetas", bucket, prefix, null));
    }

    public CompletionStage<List<S3ObjectList>> list(String bucket, Collection<String> prefixes, @Nullable String delimiter, int limitPerPrefix) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry((Context fork) -> {
            CompletableFuture[] futures = (CompletableFuture[])prefixes.stream().map(p -> this.listInternal(bucket, (String)p, delimiter, limitPerPrefix, (Context)fork).toCompletableFuture()).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures).thenApply(_v -> Arrays.stream(futures).map(f -> (S3ObjectList)f.join()).toList());
        }, () -> this.telemetry.get("ListMultiObjects", bucket, null, null));
    }

    public CompletionStage<List<S3ObjectMetaList>> listMeta(String bucket, Collection<String> prefixes, @Nullable String delimiter, int limitPerPrefix) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry((Context fork) -> {
            CompletableFuture[] futures = (CompletableFuture[])prefixes.stream().map(p -> this.listMetaInternal(bucket, (String)p, delimiter, limitPerPrefix).toCompletableFuture()).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures).thenApply(_v -> Arrays.stream(futures).map(f -> (S3ObjectMetaList)f.join()).toList());
        }, () -> this.telemetry.get("ListMultiObjectMetas", bucket, null, null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletionStage<S3ObjectUpload> put(String bucket, String key, S3Body body) {
        PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder().bucket(bucket).key(key).contentType(body.type()).contentEncoding(body.encoding());
        if (body.size() > 0L) {
            requestBuilder.contentLength(Long.valueOf(body.size()));
        }
        PutObjectRequest request = (PutObjectRequest)requestBuilder.build();
        Context ctx = Context.current();
        try {
            CompletionStage operation;
            Context fork = ctx.fork();
            fork.inject();
            Long size = body.size() > 0L ? Long.valueOf(body.size()) : null;
            S3KoraClientTelemetry.S3KoraClientTelemetryContext context = this.telemetry.get("PutObject", bucket, key, size);
            if (body instanceof ByteS3Body) {
                ByteS3Body bb = (ByteS3Body)body;
                operation = this.asyncClient.putObject(request, AsyncRequestBody.fromBytes((byte[])bb.bytes())).thenApply(AwsS3ObjectUpload::new);
            } else {
                operation = body instanceof PublisherS3Body ? this.asyncClient.putObject(request, AsyncRequestBody.fromPublisher((Publisher)JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher)body.asPublisher()))).thenApply(AwsS3ObjectUpload::new) : (body.size() > 0L && body.size() <= this.awsS3ClientConfig.upload().partSize().toBytes() ? this.asyncClient.putObject(request, AsyncRequestBody.fromInputStream((InputStream)body.asInputStream(), (Long)body.size(), (ExecutorService)this.awsExecutor)).thenApply(AwsS3ObjectUpload::new) : this.multipartAsyncClient.putObject(request, AsyncRequestBody.fromInputStream((InputStream)body.asInputStream(), (Long)size, (ExecutorService)this.awsExecutor)).thenApply(AwsS3ObjectUpload::new));
            }
            CompletionStage<S3ObjectUpload> completionStage = operation.exceptionallyCompose(AwsS3KoraAsyncClient::handleExceptionStage).whenComplete((r, e) -> {
                if (e != null) {
                    context.close(AwsS3KoraAsyncClient.handleException(e));
                } else {
                    context.close();
                }
            });
            return completionStage;
        }
        finally {
            ctx.inject();
        }
    }

    public CompletionStage<Void> delete(String bucket, String key) {
        DeleteObjectRequest request = (DeleteObjectRequest)DeleteObjectRequest.builder().bucket(bucket).key(key).build();
        CompletionStage operation = this.asyncClient.deleteObject(request).thenAccept(r -> {});
        return AwsS3KoraAsyncClient.wrapWithTelemetry(operation, () -> this.telemetry.get("DeleteObject", bucket, key, null));
    }

    public CompletionStage<Void> delete(String bucket, Collection<String> keys) {
        DeleteObjectsRequest request = (DeleteObjectsRequest)DeleteObjectsRequest.builder().bucket(bucket).delete((Delete)Delete.builder().objects(keys.stream().map(k -> (ObjectIdentifier)ObjectIdentifier.builder().key(k).build()).toList()).build()).build();
        CompletionStage operation = this.asyncClient.deleteObjects(request).thenApply(response -> {
            if (response.hasErrors()) {
                List<S3DeleteException.Error> errors = response.errors().stream().map(e -> new S3DeleteException.Error(e.key(), bucket, e.code(), e.message())).toList();
                throw new S3DeleteException(errors);
            }
            return null;
        });
        return AwsS3KoraAsyncClient.wrapWithTelemetry(operation, () -> this.telemetry.get("DeleteObjects", bucket, null, null));
    }

    private static <T> CompletionStage<T> wrapWithTelemetry(CompletionStage<T> operationSupplier, Supplier<S3KoraClientTelemetry.S3KoraClientTelemetryContext> contextSupplier) {
        return AwsS3KoraAsyncClient.wrapWithTelemetry((Context context) -> operationSupplier, contextSupplier);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> CompletionStage<T> wrapWithTelemetry(Function<Context, CompletionStage<T>> operationSupplier, Supplier<S3KoraClientTelemetry.S3KoraClientTelemetryContext> contextSupplier) {
        Context ctx = Context.current();
        try {
            Context fork = ctx.fork();
            fork.inject();
            S3KoraClientTelemetry.S3KoraClientTelemetryContext context = contextSupplier.get();
            CompletionStage<Object> completionStage = operationSupplier.apply(fork).exceptionallyCompose(AwsS3KoraAsyncClient::handleExceptionStage).whenComplete((r, e) -> {
                if (e != null) {
                    context.close(AwsS3KoraAsyncClient.handleException(e));
                } else {
                    context.close();
                }
            });
            return completionStage;
        }
        finally {
            ctx.inject();
        }
    }

    private static <T> CompletionStage<T> handleExceptionStage(Throwable e) {
        return CompletableFuture.failedFuture((Throwable)AwsS3KoraAsyncClient.handleException(e));
    }

    private static S3Exception handleException(Throwable e) {
        if (e instanceof CompletionException) {
            CompletionException ce = (CompletionException)e;
            e = ce.getCause();
        }
        if (e instanceof S3Exception) {
            S3Exception se = (S3Exception)e;
            return se;
        }
        if (e instanceof NoSuchKeyException) {
            NoSuchKeyException ke = (NoSuchKeyException)e;
            return S3NotFoundException.ofNoSuchKey((Throwable)e, (String)ke.awsErrorDetails().errorMessage());
        }
        if (e instanceof NoSuchBucketException) {
            NoSuchBucketException be = (NoSuchBucketException)e;
            return S3NotFoundException.ofNoSuchBucket((Throwable)e, (String)be.awsErrorDetails().errorMessage());
        }
        if (e instanceof AwsServiceException) {
            AwsServiceException ae = (AwsServiceException)e;
            return new S3Exception(e, ae.awsErrorDetails().errorCode(), ae.awsErrorDetails().errorMessage());
        }
        return new S3Exception(e, e.getClass().getSimpleName(), e.getMessage());
    }
}

