/*
 * Decompiled with CFR 0.152.
 */
package org.openksavi.sponge.core.engine.processing;

import com.google.common.collect.Iterables;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.openksavi.sponge.EventSetProcessorAdapter;
import org.openksavi.sponge.EventSetProcessorAdapterGroup;
import org.openksavi.sponge.EventSetProcessorDefinition;
import org.openksavi.sponge.EventSetProcessorState;
import org.openksavi.sponge.core.engine.processing.BaseEventSetProcessorMainProcessingUnitHandler;
import org.openksavi.sponge.core.engine.processing.BaseMainProcessingUnit;
import org.openksavi.sponge.engine.ProcessorType;
import org.openksavi.sponge.event.Event;

public class SyncAsyncEventSetProcessorMainProcessingUnitHandler<G extends EventSetProcessorAdapterGroup<T>, T extends EventSetProcessorAdapter<?>>
extends BaseEventSetProcessorMainProcessingUnitHandler<G, T> {
    private int asyncEventSetProcessorProcessingPartitionSize = this.getProcessingUnit().getEngine().getDefaultParameters().getAsyncEventSetProcessorProcessingPartitionSize();
    private int asyncEventSetProcessorProcessingThreshold = this.getProcessingUnit().getEngine().getDefaultParameters().getAsyncEventSetProcessorProcessingThreshold();

    public SyncAsyncEventSetProcessorMainProcessingUnitHandler(ProcessorType type, BaseMainProcessingUnit processingUnit) {
        super(type, processingUnit);
    }

    public void processEventForEventSetProcessorAdapters(EventSetProcessorDefinition processorDefinition, List<T> eventSetProcessorAdapters, Event event) {
        if (this.isSynchronous(processorDefinition) || eventSetProcessorAdapters.size() <= this.asyncEventSetProcessorProcessingThreshold) {
            this.processSynchronously(eventSetProcessorAdapters, event);
        } else {
            this.processAsynchronously(eventSetProcessorAdapters, event);
        }
    }

    private boolean isSynchronous(EventSetProcessorDefinition processorDefinition) {
        Boolean isSynchronous = processorDefinition.getMeta().isSynchronous();
        if (isSynchronous != null) {
            return isSynchronous;
        }
        return this.getProcessingUnit().getEngine().getConfigurationManager().getEventSetProcessorDefaultSynchronous();
    }

    protected void processAdapter(T adapter, Event event) {
        if (adapter.getState() == EventSetProcessorState.FINISHED) {
            return;
        }
        try {
            adapter.processEvent(event);
        }
        catch (Throwable e) {
            this.getProcessingUnit().getEngine().handleError(adapter, e);
        }
    }

    protected void processSynchronously(List<T> adapters, Event event) {
        adapters.forEach(adapter -> this.processAdapter(adapter, event));
    }

    protected void processAsynchronously(List<T> adapters, Event event) {
        for (List partition : Iterables.partition(adapters, (int)this.asyncEventSetProcessorProcessingPartitionSize)) {
            CompletableFuture.allOf((CompletableFuture[])partition.stream().map(adapter -> CompletableFuture.runAsync(() -> {
                try {
                    this.processAdapter(adapter, event);
                }
                catch (Throwable e) {
                    this.getProcessingUnit().getEngine().handleError(SyncAsyncEventSetProcessorMainProcessingUnitHandler.class.getSimpleName() + ".processAsynchronously", e);
                }
            }, this.getAsyncEventSetProcessorThreadPool().getExecutor())).toArray(CompletableFuture[]::new)).join();
        }
    }
}

