package xin.manong.stream.framework.processor;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.sdk.common.UnacceptableException;
import xin.manong.weapon.base.common.Context;
import xin.manong.weapon.base.record.KVRecords;

/* loaded from: input_file:xin/manong/stream/framework/processor/ProcessorGraph.class */
public class ProcessorGraph {
    private static final Logger logger = LoggerFactory.getLogger(ProcessorGraph.class);
    private String id = UUID.randomUUID().toString();
    private List<ProcessorConfig> processorGraphConfig;
    private Map<String, Processor> processors;

    public ProcessorGraph(List<ProcessorConfig> list) {
        this.processorGraphConfig = list;
    }

    public final void buildGraph() throws UnacceptableException {
        checkGraph();
        this.processors = new HashMap();
        for (ProcessorConfig processorConfig : this.processorGraphConfig) {
            Processor processor = new Processor();
            if (!processor.init(processorConfig)) {
                throw new UnacceptableException(String.format("init processor[%s] failed", processorConfig.name));
            }
            this.processors.put(processor.name, processor);
        }
        for (ProcessorConfig processorConfig2 : this.processorGraphConfig) {
            Processor processor2 = this.processors.get(processorConfig2.name);
            for (Map.Entry<String, String> entry : processorConfig2.processors.entrySet()) {
                String value = entry.getValue();
                if (!this.processors.containsKey(value)) {
                    logger.error("processor[{}] is not found for building graph", value);
                    throw new UnacceptableException(String.format("processor[%s] is not found for building graph", value));
                }
                processor2.setProcessor(entry.getKey(), this.processors.get(value));
            }
        }
        logger.info("build processor graph success");
    }

    public final void closeGraph() {
        if (this.processors == null) {
            return;
        }
        List<Processor> findStartProcessors = findStartProcessors();
        while (!findStartProcessors.isEmpty()) {
            Processor remove = findStartProcessors.remove(0);
            for (Processor processor : remove.processors.values()) {
                if (!findStartProcessors.contains(processor)) {
                    findStartProcessors.add(processor);
                }
            }
            remove.destroy();
        }
        logger.info("close processor graph success");
    }

    public final void process(String str, KVRecords kVRecords, Context context) throws Exception {
        if (StringUtils.isEmpty(str)) {
            logger.warn("processor name is empty");
            throw new UnacceptableException("processor name is empty");
        }
        Processor orDefault = this.processors.getOrDefault(str, null);
        if (orDefault == null) {
            logger.warn("processor is not found for name[{}]", str);
            throw new UnacceptableException(String.format("processor is not found for name[%s]", str));
        }
        orDefault.process(kVRecords, context);
    }

    public final boolean containsProcessor(String str) {
        return this.processors != null && this.processors.containsKey(str);
    }

    private void checkGraph() throws UnacceptableException {
        if (this.processorGraphConfig == null || this.processorGraphConfig.isEmpty()) {
            logger.error("processor graph config is empty");
            throw new UnacceptableException("processor graph config is empty");
        }
        Map<String, ProcessorConfig> map = (Map) this.processorGraphConfig.stream().collect(Collectors.toMap(processorConfig -> {
            return processorConfig.name;
        }, processorConfig2 -> {
            return processorConfig2;
        }));
        if (map.size() != this.processorGraphConfig.size()) {
            logger.error("the same processor exists");
            throw new UnacceptableException("the same processor exists");
        }
        HashSet hashSet = new HashSet();
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            hashSet.addAll(checkGraph(it.next(), map));
            if (hashSet.size() == this.processorGraphConfig.size()) {
                return;
            }
        }
    }

    private Set<String> checkGraph(String str, Map<String, ProcessorConfig> map) throws UnacceptableException {
        HashSet hashSet = new HashSet();
        LinkedList linkedList = new LinkedList();
        hashSet.add(str);
        linkedList.add(str);
        while (!linkedList.isEmpty()) {
            for (String str2 : map.get(linkedList.remove(0)).processors.values()) {
                if (!map.containsKey(str2)) {
                    logger.error("processor[{}] is not found for building graph", str2);
                    throw new UnacceptableException(String.format("processor[%s] is not found for building graph", str2));
                }
                if (hashSet.contains(str2)) {
                    logger.error("check graph failed, find cycle in graph for processor[{}]", str2);
                    throw new UnacceptableException(String.format("check graph failed, find cycle in graph for processor[%s]", str2));
                }
                linkedList.add(str2);
            }
        }
        return hashSet;
    }

    private List<Processor> findStartProcessors() {
        HashMap hashMap = new HashMap(this.processors);
        Iterator<Processor> it = this.processors.values().iterator();
        while (it.hasNext()) {
            for (Processor processor : it.next().processors.values()) {
                if (hashMap.containsKey(processor.name)) {
                    hashMap.remove(processor.name);
                }
            }
        }
        return new LinkedList(hashMap.values());
    }

    public String getId() {
        return this.id;
    }
}
