/*
 * Decompiled with CFR 0.152.
 */
package org.pinus4j.api;

import java.util.ArrayList;
import java.util.List;
import org.pinus4j.api.ITask;
import org.pinus4j.api.TaskFuture;
import org.pinus4j.api.query.IQuery;
import org.pinus4j.cluster.DB;
import org.pinus4j.cluster.IDBCluster;
import org.pinus4j.cluster.beans.DBInfo;
import org.pinus4j.datalayer.IRecordIterator;
import org.pinus4j.datalayer.iterator.AbstractRecordIterator;
import org.pinus4j.datalayer.iterator.GlobalRecordIterator;
import org.pinus4j.datalayer.iterator.ShardingRecordIterator;
import org.pinus4j.exceptions.DBClusterException;
import org.pinus4j.exceptions.DBOperationException;
import org.pinus4j.exceptions.TaskException;
import org.pinus4j.utils.ReflectUtil;
import org.pinus4j.utils.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutor<E> {
    public static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
    private static final String THREADPOOL_NAME = "pinus";
    private Class<E> clazz;
    private IDBCluster dbCluster;

    public TaskExecutor(Class<E> clazz, IDBCluster dbCluster) {
        this.clazz = clazz;
        this.dbCluster = dbCluster;
    }

    public TaskFuture execute(ITask<E> task) {
        return this.execute(task, null);
    }

    public TaskFuture execute(ITask<E> task, IQuery query) {
        try {
            task.init();
        }
        catch (Exception e) {
            throw new TaskException(e);
        }
        ThreadPool threadPool = ThreadPool.newInstance(THREADPOOL_NAME);
        TaskFuture future = null;
        String clusterName = ReflectUtil.getClusterName(this.clazz);
        AbstractRecordIterator reader = null;
        if (ReflectUtil.isShardingEntity(this.clazz)) {
            List<DB> dbs = this.dbCluster.getAllMasterShardingDB(this.clazz);
            ArrayList<AbstractRecordIterator> readers = new ArrayList<AbstractRecordIterator>(dbs.size());
            long total = 0L;
            for (DB dB : dbs) {
                reader = new ShardingRecordIterator<E>(dB, this.clazz);
                if (task.taskBuffer() > 0) {
                    reader.setStep(task.taskBuffer());
                }
                reader.setQuery(query);
                readers.add(reader);
                total += reader.getCount();
            }
            future = new TaskFuture(total, threadPool, task);
            for (IRecordIterator iRecordIterator : readers) {
                threadPool.submit(new RecrodReaderThread<E>(iRecordIterator, threadPool, task, future));
            }
        } else {
            DBInfo dbConnInfo;
            RecrodThread rt = null;
            try {
                dbConnInfo = this.dbCluster.getMasterGlobalConn(clusterName);
            }
            catch (DBClusterException e) {
                throw new DBOperationException(e);
            }
            reader = new GlobalRecordIterator<E>(dbConnInfo, this.clazz);
            if (task.taskBuffer() > 0) {
                reader.setStep(task.taskBuffer());
            }
            reader.setQuery(query);
            future = new TaskFuture(reader.getCount(), threadPool, task);
            while (reader.hasNext()) {
                List record = reader.nextMore();
                rt = new RecrodThread(record, task, future);
                threadPool.submit(rt);
            }
        }
        return future;
    }

    public static class RecrodThread<E>
    implements Runnable {
        public static final Logger LOG = LoggerFactory.getLogger(RecrodThread.class);
        private List<E> record;
        private ITask<E> task;
        private TaskFuture future;

        public RecrodThread(List<E> record, ITask<E> task, TaskFuture future) {
            this.record = record;
            this.task = task;
            this.future = future;
        }

        @Override
        public void run() {
            try {
                this.task.batchRecord(this.record);
                this.task.afterBatch();
            }
            catch (Exception e) {
                LOG.warn("do task failure " + this.record, (Throwable)e);
            }
            finally {
                this.future.down(this.record.size());
                this.future.incrCount(this.record.size());
            }
        }
    }

    public static class RecrodReaderThread<E>
    implements Runnable {
        private IRecordIterator<E> recordReader;
        private ThreadPool threadPool;
        private ITask<E> task;
        private TaskFuture future;

        public RecrodReaderThread(IRecordIterator<E> recordReader, ThreadPool threadPool, ITask<E> task, TaskFuture future) {
            this.recordReader = recordReader;
            this.threadPool = threadPool;
            this.task = task;
            this.future = future;
        }

        @Override
        public void run() {
            RecrodThread<E> rt = null;
            while (this.recordReader.hasNext()) {
                List<E> record = this.recordReader.nextMore();
                rt = new RecrodThread<E>(record, this.task, this.future);
                this.threadPool.submit(rt);
            }
        }
    }
}

