/*
 * Decompiled with CFR 0.152.
 */
package org.bonitasoft.engine.connector.impl;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.bonitasoft.engine.commons.exceptions.SBonitaException;
import org.bonitasoft.engine.connector.ConnectorExecutor;
import org.bonitasoft.engine.connector.SConnector;
import org.bonitasoft.engine.connector.exception.SConnectorException;
import org.bonitasoft.engine.connector.exception.SConnectorValidationException;
import org.bonitasoft.engine.connector.impl.ConnectorExecutorThreadFactory;
import org.bonitasoft.engine.log.technical.TechnicalLogSeverity;
import org.bonitasoft.engine.log.technical.TechnicalLoggerService;
import org.bonitasoft.engine.session.SessionService;
import org.bonitasoft.engine.sessionaccessor.SessionAccessor;
import org.bonitasoft.engine.sessionaccessor.SessionIdNotSetException;

public class ConnectorExecutorImpl
implements ConnectorExecutor {
    private ThreadPoolExecutor threadPoolExecutor;
    private final SessionAccessor sessionAccessor;
    private final SessionService sessionService;
    private final int queueCapacity;
    private final int corePoolSize;
    private final int maximumPoolSize;
    private final long keepAliveTimeSeconds;
    private final TechnicalLoggerService loggerService;

    public ConnectorExecutorImpl(int queueCapacity, int corePoolSize, TechnicalLoggerService loggerService, int maximumPoolSize, long keepAliveTimeSeconds, SessionAccessor sessionAccessor, SessionService sessionService) {
        this.queueCapacity = queueCapacity;
        this.corePoolSize = corePoolSize;
        this.loggerService = loggerService;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTimeSeconds = keepAliveTimeSeconds;
        this.sessionAccessor = sessionAccessor;
        this.sessionService = sessionService;
    }

    @Override
    public Map<String, Object> execute(SConnector sConnector, Map<String, Object> inputParameters) throws SConnectorException {
        if (this.threadPoolExecutor == null) {
            throw new SConnectorException("Unable to execute a connector if the node is node started. Start it first");
        }
        ExecuteConnectorCallable callable = new ExecuteConnectorCallable(inputParameters, sConnector);
        Future<Map<String, Object>> submit = this.threadPoolExecutor.submit(callable);
        try {
            return this.getValue(submit);
        }
        catch (InterruptedException e) {
            this.disconnect(sConnector);
            throw new SConnectorException(e);
        }
        catch (ExecutionException e) {
            this.disconnect(sConnector);
            throw new SConnectorException(e);
        }
        catch (TimeoutException e) {
            submit.cancel(true);
            this.disconnect(sConnector);
            throw new SConnectorException("The connector timed out " + sConnector);
        }
    }

    protected Map<String, Object> getValue(Future<Map<String, Object>> submit) throws InterruptedException, ExecutionException, TimeoutException {
        return submit.get();
    }

    @Override
    public void disconnect(SConnector sConnector) throws SConnectorException {
        try {
            sConnector.disconnect();
        }
        catch (SConnectorException e) {
            throw e;
        }
        catch (Throwable t) {
            throw new SConnectorException(t);
        }
    }

    @Override
    public void start() throws SBonitaException {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(this.queueCapacity);
        QueueRejectedExecutionHandler handler = new QueueRejectedExecutionHandler(this.loggerService);
        ConnectorExecutorThreadFactory threadFactory = new ConnectorExecutorThreadFactory("ConnectorExecutor");
        this.threadPoolExecutor = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTimeSeconds, TimeUnit.SECONDS, workQueue, threadFactory, handler);
    }

    @Override
    public void stop() {
        if (this.threadPoolExecutor != null) {
            this.threadPoolExecutor.shutdown();
            try {
                if (!this.threadPoolExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    this.loggerService.log(this.getClass(), TechnicalLogSeverity.WARNING, "Timeout  (5s) trying to stop the connector executor thread pool");
                }
            }
            catch (InterruptedException e) {
                this.loggerService.log(this.getClass(), TechnicalLogSeverity.WARNING, "Error while stopping the connector executor thread pool", e);
            }
        }
    }

    private final class QueueRejectedExecutionHandler
    implements RejectedExecutionHandler {
        private final TechnicalLoggerService logger;

        public QueueRejectedExecutionHandler(TechnicalLoggerService logger) {
            this.logger = logger;
        }

        @Override
        public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
            if (this.logger.isLoggable(this.getClass(), TechnicalLogSeverity.WARNING)) {
                this.logger.log(ThreadPoolExecutor.class, TechnicalLogSeverity.WARNING, "The work was rejected, requeue work: " + task.toString());
            }
            try {
                executor.getQueue().put(task);
            }
            catch (InterruptedException e) {
                throw new RejectedExecutionException("queuing " + task + " got interrupted", e);
            }
        }
    }

    private final class ExecuteConnectorCallable
    implements Callable<Map<String, Object>> {
        private final Map<String, Object> inputParameters;
        private final SConnector sConnector;

        private ExecuteConnectorCallable(Map<String, Object> inputParameters, SConnector sConnector) {
            this.inputParameters = inputParameters;
            this.sConnector = sConnector;
        }

        @Override
        public Map<String, Object> call() throws Exception {
            this.sConnector.setInputParameters(this.inputParameters);
            try {
                this.sConnector.validate();
                this.sConnector.connect();
                Map<String, Object> map = this.sConnector.execute();
                return map;
            }
            catch (SConnectorValidationException e) {
                throw new SConnectorException(e);
            }
            finally {
                try {
                    long sessionId = ConnectorExecutorImpl.this.sessionAccessor.getSessionId();
                    ConnectorExecutorImpl.this.sessionAccessor.deleteSessionId();
                    ConnectorExecutorImpl.this.sessionService.deleteSession(sessionId);
                }
                catch (SessionIdNotSetException e) {}
            }
        }
    }
}

