/*
 * Decompiled with CFR 0.152.
 */
package org.camunda.bpm.engine.rest.impl;

import jakarta.servlet.ServletContext;
import jakarta.servlet.ServletContextEvent;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.camunda.bpm.engine.IdentityService;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder;
import org.camunda.bpm.engine.impl.ProcessEngineImpl;
import org.camunda.bpm.engine.impl.identity.Authentication;
import org.camunda.bpm.engine.impl.util.ClockUtil;
import org.camunda.bpm.engine.impl.util.SingleConsumerCondition;
import org.camunda.bpm.engine.rest.dto.externaltask.FetchExternalTasksExtendedDto;
import org.camunda.bpm.engine.rest.dto.externaltask.LockedExternalTaskDto;
import org.camunda.bpm.engine.rest.exception.InvalidRequestException;
import org.camunda.bpm.engine.rest.exception.RestException;
import org.camunda.bpm.engine.rest.impl.FetchAndLockRequest;
import org.camunda.bpm.engine.rest.impl.FetchAndLockResult;
import org.camunda.bpm.engine.rest.spi.FetchAndLockHandler;
import org.camunda.bpm.engine.rest.util.EngineUtil;

public class FetchAndLockHandlerImpl
implements Runnable,
FetchAndLockHandler {
    private static final Logger LOG = Logger.getLogger(FetchAndLockHandlerImpl.class.getName());
    protected static final String UNIQUE_WORKER_REQUEST_PARAM_NAME = "fetch-and-lock-unique-worker-request";
    protected static final long PENDING_REQUEST_FETCH_INTERVAL = 30000L;
    protected static final long MAX_BACK_OFF_TIME = Long.MAX_VALUE;
    protected static final long MAX_REQUEST_TIMEOUT = 1800000L;
    protected SingleConsumerCondition condition;
    protected BlockingQueue<FetchAndLockRequest> queue = new ArrayBlockingQueue<FetchAndLockRequest>(200);
    protected List<FetchAndLockRequest> pendingRequests = new ArrayList<FetchAndLockRequest>();
    protected List<FetchAndLockRequest> newRequests = new ArrayList<FetchAndLockRequest>();
    protected Thread handlerThread = new Thread((Runnable)this, this.getClass().getSimpleName());
    protected volatile boolean isRunning = false;
    protected boolean isUniqueWorkerRequest = false;

    public FetchAndLockHandlerImpl() {
        this.condition = new SingleConsumerCondition(this.handlerThread);
    }

    @Override
    public void run() {
        while (this.isRunning) {
            try {
                this.acquire();
            }
            catch (Exception exception) {}
        }
        this.rejectPendingRequests();
    }

    protected void acquire() {
        LOG.log(Level.FINEST, "Acquire start");
        this.queue.drainTo(this.newRequests);
        if (!this.newRequests.isEmpty()) {
            if (this.isUniqueWorkerRequest) {
                this.removeDuplicates();
            }
            this.pendingRequests.addAll(this.newRequests);
            this.newRequests.clear();
        }
        LOG.log(Level.FINEST, "Number of pending requests {0}", this.pendingRequests.size());
        long backoffTime = Long.MAX_VALUE;
        Iterator<FetchAndLockRequest> iterator = this.pendingRequests.iterator();
        while (iterator.hasNext()) {
            FetchAndLockRequest pendingRequest = iterator.next();
            LOG.log(Level.FINEST, "Fetching tasks for request {0}", pendingRequest);
            FetchAndLockResult result = this.tryFetchAndLock(pendingRequest);
            LOG.log(Level.FINEST, "Fetch and lock result: {0}", result);
            if (result.wasSuccessful()) {
                List<LockedExternalTaskDto> lockedTasks = result.getTasks();
                if (!lockedTasks.isEmpty() || this.isExpired(pendingRequest)) {
                    AsyncResponse asyncResponse = pendingRequest.getAsyncResponse();
                    asyncResponse.resume(lockedTasks);
                    LOG.log(Level.FINEST, "resume and remove request with {0}", lockedTasks);
                    iterator.remove();
                    continue;
                }
                long msUntilTimeout = pendingRequest.getTimeoutTimestamp() - ClockUtil.getCurrentTime().getTime();
                backoffTime = Math.min(backoffTime, msUntilTimeout);
                continue;
            }
            AsyncResponse asyncResponse = pendingRequest.getAsyncResponse();
            Throwable processEngineException = result.getThrowable();
            asyncResponse.resume(processEngineException);
            LOG.log(Level.FINEST, "Resume and remove request with error", processEngineException);
            iterator.remove();
        }
        long waitTime = Math.max(0L, backoffTime);
        if (this.pendingRequests.isEmpty()) {
            this.suspend(waitTime);
        } else {
            this.suspend(Math.min(30000L, waitTime));
        }
    }

    protected void removeDuplicates() {
        for (FetchAndLockRequest newRequest : this.newRequests) {
            Iterator<FetchAndLockRequest> iterator = this.pendingRequests.iterator();
            while (iterator.hasNext()) {
                FetchAndLockRequest pendingRequest = iterator.next();
                if (!pendingRequest.getDto().getWorkerId().equals(newRequest.getDto().getWorkerId())) continue;
                AsyncResponse asyncResponse = pendingRequest.getAsyncResponse();
                asyncResponse.cancel();
                iterator.remove();
            }
        }
    }

    @Override
    public void start() {
        if (this.isRunning) {
            return;
        }
        this.isRunning = true;
        this.handlerThread.start();
        ProcessEngineImpl.EXT_TASK_CONDITIONS.addConsumer(this.condition);
    }

    @Override
    public void shutdown() {
        try {
            ProcessEngineImpl.EXT_TASK_CONDITIONS.removeConsumer(this.condition);
        }
        finally {
            this.isRunning = false;
            this.condition.signal();
        }
        try {
            this.handlerThread.join();
        }
        catch (InterruptedException e) {
            LOG.log(Level.WARNING, "Shutting down the handler thread failed", e);
        }
    }

    protected void suspend(long millis) {
        if (millis <= 0L) {
            return;
        }
        this.suspendAcquisition(millis);
    }

    protected void suspendAcquisition(long millis) {
        try {
            if (this.queue.isEmpty() && this.isRunning) {
                LOG.log(Level.FINEST, "Suspend acquisition for {0}ms", millis);
                this.condition.await(millis);
                LOG.log(Level.FINEST, "Acquisition woke up");
            }
        }
        finally {
            if (this.handlerThread.isInterrupted()) {
                Thread.currentThread().interrupt();
            }
        }
    }

    protected void addRequest(FetchAndLockRequest request) {
        if (!this.queue.offer(request)) {
            AsyncResponse asyncResponse = request.getAsyncResponse();
            this.errorTooManyRequests(asyncResponse);
        }
        this.condition.signal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected FetchAndLockResult tryFetchAndLock(FetchAndLockRequest request) {
        ProcessEngine processEngine = null;
        IdentityService identityService = null;
        FetchAndLockResult result = null;
        try {
            processEngine = this.getProcessEngine(request);
            identityService = processEngine.getIdentityService();
            identityService.setAuthentication(request.getAuthentication());
            FetchExternalTasksExtendedDto fetchingDto = request.getDto();
            List<LockedExternalTaskDto> lockedTasks = this.executeFetchAndLock(fetchingDto, processEngine);
            result = FetchAndLockResult.successful(lockedTasks);
        }
        catch (Exception e) {
            result = FetchAndLockResult.failed(e);
        }
        finally {
            if (identityService != null) {
                identityService.clearAuthentication();
            }
        }
        return result;
    }

    protected List<LockedExternalTaskDto> executeFetchAndLock(FetchExternalTasksExtendedDto fetchingDto, ProcessEngine processEngine) {
        ExternalTaskQueryTopicBuilder fetchBuilder = fetchingDto.buildQuery(processEngine);
        List externalTasks = fetchBuilder.execute();
        return LockedExternalTaskDto.fromLockedExternalTasks(externalTasks);
    }

    protected void errorTooManyRequests(AsyncResponse asyncResponse) {
        String errorMessage = "At the moment the server has to handle too many requests at the same time. Please try again later.";
        asyncResponse.resume((Throwable)new InvalidRequestException(Response.Status.INTERNAL_SERVER_ERROR, errorMessage));
    }

    protected void rejectPendingRequests() {
        for (FetchAndLockRequest pendingRequest : this.pendingRequests) {
            AsyncResponse asyncResponse = pendingRequest.getAsyncResponse();
            asyncResponse.resume((Throwable)new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Request rejected due to shutdown of application server."));
        }
    }

    protected ProcessEngine getProcessEngine(FetchAndLockRequest request) {
        String processEngineName = request.getProcessEngineName();
        return EngineUtil.lookupProcessEngine(processEngineName);
    }

    protected boolean isExpired(FetchAndLockRequest request) {
        long currentTime = ClockUtil.getCurrentTime().getTime();
        long timeout = request.getTimeoutTimestamp();
        return timeout <= currentTime;
    }

    @Override
    public void addPendingRequest(FetchExternalTasksExtendedDto dto, AsyncResponse asyncResponse, ProcessEngine processEngine) {
        Long asyncResponseTimeout = dto.getAsyncResponseTimeout();
        if (asyncResponseTimeout != null && asyncResponseTimeout > 1800000L) {
            asyncResponse.resume((Throwable)new InvalidRequestException(Response.Status.BAD_REQUEST, "The asynchronous response timeout cannot be set to a value greater than 1800000 milliseconds"));
            return;
        }
        IdentityService identityService = processEngine.getIdentityService();
        Authentication authentication = identityService.getCurrentAuthentication();
        String processEngineName = processEngine.getName();
        FetchAndLockRequest incomingRequest = new FetchAndLockRequest().setProcessEngineName(processEngineName).setAsyncResponse(asyncResponse).setAuthentication(authentication).setDto(dto);
        LOG.log(Level.FINEST, "New request: {0}", incomingRequest);
        FetchAndLockResult result = this.tryFetchAndLock(incomingRequest);
        LOG.log(Level.FINEST, "Fetch and lock result: {0}", result);
        if (result.wasSuccessful()) {
            List<LockedExternalTaskDto> lockedTasks = result.getTasks();
            if (!lockedTasks.isEmpty() || dto.getAsyncResponseTimeout() == null) {
                asyncResponse.resume(lockedTasks);
                LOG.log(Level.FINEST, "Resuming request with {0}", lockedTasks);
            } else {
                this.addRequest(incomingRequest);
                LOG.log(Level.FINEST, "Deferred request");
            }
        } else {
            Throwable processEngineException = result.getThrowable();
            asyncResponse.resume(processEngineException);
            LOG.log(Level.FINEST, "Resuming request with error", processEngineException);
        }
    }

    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        ServletContext servletContext = null;
        if (servletContextEvent != null && (servletContext = servletContextEvent.getServletContext()) != null) {
            this.parseUniqueWorkerRequestParam(servletContext.getInitParameter(UNIQUE_WORKER_REQUEST_PARAM_NAME));
        }
    }

    protected void parseUniqueWorkerRequestParam(String uniqueWorkerRequestParam) {
        this.isUniqueWorkerRequest = uniqueWorkerRequestParam != null ? Boolean.parseBoolean(uniqueWorkerRequestParam) : false;
    }

    public List<FetchAndLockRequest> getPendingRequests() {
        return this.pendingRequests;
    }
}

