/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.camunda.zeebe.worker;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.client.api.worker.JobHandler;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1;
import io.camunda.zeebe.client.api.worker.JobWorkerMetrics;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.camunda.zeebe.worker.KoraJobWorker;
import ru.tinkoff.kora.camunda.zeebe.worker.WrappedJobHandler;
import ru.tinkoff.kora.camunda.zeebe.worker.ZeebeBackoffFactory;
import ru.tinkoff.kora.camunda.zeebe.worker.ZeebeClientConfig;
import ru.tinkoff.kora.camunda.zeebe.worker.ZeebeWorkerConfig;
import ru.tinkoff.kora.camunda.zeebe.worker.telemetry.ZeebeClientWorkerMetricsFactory;
import ru.tinkoff.kora.camunda.zeebe.worker.telemetry.ZeebeWorkerTelemetry;
import ru.tinkoff.kora.camunda.zeebe.worker.telemetry.ZeebeWorkerTelemetryFactory;
import ru.tinkoff.kora.common.util.TimeUtils;

public final class KoraZeebeJobWorkerEngine
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KoraZeebeJobWorkerEngine.class);
    private final ZeebeClient client;
    private final List<KoraJobWorker> jobWorkers;
    private final ZeebeClientConfig clientConfig;
    private final ZeebeWorkerConfig workerConfig;
    private final ZeebeBackoffFactory zeebeBackoffFactory;
    private final ZeebeWorkerTelemetryFactory telemetryFactory;
    private final ZeebeClientWorkerMetricsFactory zeebeMetricsFactory;
    private final List<JobWorker> workers = new CopyOnWriteArrayList<JobWorker>();

    public KoraZeebeJobWorkerEngine(ZeebeClient client, List<KoraJobWorker> jobWorkers, ZeebeClientConfig clientConfig, ZeebeWorkerConfig workerConfig, ZeebeBackoffFactory zeebeBackoffFactory, ZeebeWorkerTelemetryFactory telemetryFactory, @Nullable ZeebeClientWorkerMetricsFactory zeebeMetricsFactory) {
        this.client = client;
        this.jobWorkers = jobWorkers;
        this.clientConfig = clientConfig;
        this.workerConfig = workerConfig;
        this.zeebeBackoffFactory = zeebeBackoffFactory;
        this.telemetryFactory = telemetryFactory;
        this.zeebeMetricsFactory = zeebeMetricsFactory;
    }

    public void init() {
        if (!this.jobWorkers.isEmpty()) {
            logger.debug("Zeebe JobWorkers starting...");
            long started = TimeUtils.started();
            Map<String, List<KoraJobWorker>> workersByType = this.jobWorkers.stream().collect(Collectors.groupingBy(KoraJobWorker::type));
            try {
                for (List<KoraJobWorker> value : workersByType.values()) {
                    if (value.size() <= 1) continue;
                    logger.warn("Found '{}' Zeebe JobWorkers with same JobType: {}", (Object)value.size(), (Object)value.get(0).type());
                }
                CompletableFuture[] jobOpeners = (CompletableFuture[])this.jobWorkers.stream().map(koraJobWorker -> CompletableFuture.runAsync(() -> {
                    try {
                        ZeebeWorkerConfig.JobConfig jobConfig = this.workerConfig.getJobConfig(koraJobWorker.type());
                        if (jobConfig.enabled().booleanValue()) {
                            JobWorker jobWorker = this.createJobWorker((KoraJobWorker)koraJobWorker, jobConfig);
                            this.workers.add(jobWorker);
                        }
                    }
                    catch (Exception e) {
                        throw new IllegalStateException("Zeebe JobWorker '%s' failed to start, due to: ".formatted(koraJobWorker.type(), e.getMessage()), e);
                    }
                })).toArray(CompletableFuture[]::new);
                CompletableFuture.allOf(jobOpeners).join();
                List<String> workerNames = this.jobWorkers.stream().map(KoraJobWorker::type).toList();
                logger.info("Zeebe JobWorkers {} started in {}", workerNames, (Object)TimeUtils.tookForLogging((long)started));
            }
            catch (Exception e) {
                Throwable cause = e;
                if (e instanceof CompletionException) {
                    CompletionException ce = (CompletionException)e;
                    cause = ce.getCause();
                }
                if (cause instanceof IllegalStateException) {
                    IllegalStateException ie = (IllegalStateException)cause;
                    throw ie;
                }
                throw new RuntimeException("Zeebe JobWorkers with types %s failed to start, due to: %s".formatted(workersByType.keySet(), e.getMessage()), cause);
            }
        }
    }

    public void release() {
        if (!this.workers.isEmpty()) {
            logger.debug("Zeebe JobWorkers stopping...");
            long started = TimeUtils.started();
            for (JobWorker worker : this.workers) {
                try {
                    worker.close();
                }
                catch (Exception exception) {}
            }
            List<String> workerNames = this.jobWorkers.stream().map(KoraJobWorker::type).toList();
            logger.info("Zeebe JobWorkers {} stopped in {}", workerNames, (Object)TimeUtils.tookForLogging((long)started));
        }
    }

    private JobWorker createJobWorker(KoraJobWorker worker, ZeebeWorkerConfig.JobConfig jobConfig) {
        ZeebeWorkerTelemetry telemetry = this.telemetryFactory.get(worker.type(), this.clientConfig.telemetry());
        WrappedJobHandler jobHandler = new WrappedJobHandler(telemetry, worker);
        BackoffSupplier backoffSupplier = this.zeebeBackoffFactory.build(jobConfig.backoff());
        JobWorkerMetrics jobWorkerMetrics = this.zeebeMetricsFactory == null ? null : this.zeebeMetricsFactory.get(worker.type(), this.clientConfig.telemetry().metrics());
        JobWorkerBuilderStep1.JobWorkerBuilderStep3 builder = this.client.newWorker().jobType(worker.type()).handler((JobHandler)jobHandler).name(jobConfig.name()).metrics(jobWorkerMetrics).fetchVariables(worker.fetchVariables()).backoffSupplier(backoffSupplier).timeout(jobConfig.timeout()).pollInterval(jobConfig.pollInterval()).requestTimeout(jobConfig.requestTimeout()).streamTimeout(jobConfig.streamTimeout());
        if (jobConfig.streamEnabled() != null) {
            builder = builder.streamEnabled(jobConfig.streamEnabled().booleanValue());
        }
        if (jobConfig.maxJobsActive() != null) {
            builder = builder.maxJobsActive(jobConfig.maxJobsActive().intValue());
        }
        if (jobConfig.tenantIds() != null && !jobConfig.tenantIds().isEmpty()) {
            builder.tenantIds(jobConfig.tenantIds());
        }
        return builder.open();
    }
}

