package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.rpc.AggregateRpcService;
import com.baidu.hugegraph.util.E;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/aggregator/WorkerAggrManager.class */
public class WorkerAggrManager implements Manager {
    public static final String NAME = "worker_aggr";
    private final ComputerContext context;
    private AggregateRpcService service = null;
    private RegisterAggregators registerAggregators = new RegisterAggregators();
    private Map<String, Value> lastAggregators = new HashMap();
    private Aggregators currentAggregators = new Aggregators();

    public WorkerAggrManager(ComputerContext computerContext) {
        this.context = computerContext;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        this.registerAggregators = service().registeredAggregators();
        this.registerAggregators.repair(this.context);
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
        this.registerAggregators.clear();
        this.lastAggregators.clear();
        this.currentAggregators.clear();
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void beforeSuperstep(Config config, int i) {
        reloadAggregators();
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void afterSuperstep(Config config, int i) {
        flushAggregators();
    }

    public void service(AggregateRpcService aggregateRpcService) {
        E.checkNotNull(aggregateRpcService, "service");
        this.service = aggregateRpcService;
    }

    public <V extends Value> Aggregator<V> createAggregator(String str) {
        return (Aggregator<V>) this.registerAggregators.copy(str);
    }

    public <V extends Value> void aggregateValue(String str, V v) {
        E.checkArgument(v != null, "Can't set value to null for aggregator '%s'", str);
        Aggregator<V> aggregator = this.currentAggregators.get(str, service());
        synchronized (aggregator) {
            aggregator.aggregateValue((Aggregator<V>) v);
        }
    }

    public <V extends Value> V aggregatedValue(String str) {
        V v = (V) this.lastAggregators.get(str);
        E.checkArgument(v != null, "Can't find aggregator value with name '%s'", str);
        return v;
    }

    private void flushAggregators() {
        service().aggregateAggregators(this.currentAggregators.values());
        this.currentAggregators.clear();
    }

    private void reloadAggregators() {
        this.lastAggregators = service().listAggregators();
        E.checkNotNull(this.lastAggregators, "lastAggregators");
        this.currentAggregators = new Aggregators(this.registerAggregators.copyAll());
    }

    private AggregateRpcService service() {
        E.checkArgumentNotNull(this.service, "Not init AggregateRpcService", new Object[0]);
        return this.service;
    }
}
