/*
 * Decompiled with CFR 0.152.
 */
package org.slingerxv.limitart.rpcx.center;

import io.netty.channel.Channel;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.quartz.JobDataMap;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slingerxv.limitart.collections.ConcurrentHashSet;
import org.slingerxv.limitart.net.AddressPair;
import org.slingerxv.limitart.net.binary.BinaryServer;
import org.slingerxv.limitart.net.binary.handler.IHandler;
import org.slingerxv.limitart.net.binary.message.Message;
import org.slingerxv.limitart.net.binary.message.MessageFactory;
import org.slingerxv.limitart.net.binary.message.exception.MessageCodecException;
import org.slingerxv.limitart.rpcx.center.config.ServiceCenterXConfig;
import org.slingerxv.limitart.rpcx.center.schedule.ScheduleTask;
import org.slingerxv.limitart.rpcx.center.struct.ServiceXClientSession;
import org.slingerxv.limitart.rpcx.center.struct.ServiceXServerSession;
import org.slingerxv.limitart.rpcx.message.schedule.AddScheduleToServiceCenterProviderMessage;
import org.slingerxv.limitart.rpcx.message.service.NoticeProviderDisconnectedServiceCenterMessage;
import org.slingerxv.limitart.rpcx.message.service.PushServiceToServiceCenterProviderMessage;
import org.slingerxv.limitart.rpcx.message.service.SubscribeServiceFromServiceCenterConsumerMessage;
import org.slingerxv.limitart.rpcx.message.service.SubscribeServiceResultServiceCenterMessage;
import org.slingerxv.limitart.rpcx.message.service.meta.ProviderHostMeta;
import org.slingerxv.limitart.rpcx.message.service.meta.ProviderServiceMeta;
import org.slingerxv.limitart.util.SchedulerUtil;
import org.slingerxv.limitart.util.StringUtil;
import org.slingerxv.limitart.util.TimeUtil;

public class ServiceCenterX {
    private static Logger log = LoggerFactory.getLogger(ServiceCenterX.class);
    private ServiceCenterXConfig config;
    private BinaryServer binaryServer;
    private Map<Integer, ServiceXServerSession> rpcServers = new ConcurrentHashMap<Integer, ServiceXServerSession>();
    private Map<String, ServiceXClientSession> rpcClients = new ConcurrentHashMap<String, ServiceXClientSession>();
    private Map<String, Set<Integer>> service2Providers = new ConcurrentHashMap<String, Set<Integer>>();
    private Map<String, Set<Integer>> schedules = new ConcurrentHashMap<String, Set<Integer>>();

    public ServiceCenterX(ServiceCenterXConfig config) throws Exception {
        Objects.requireNonNull(config, "config");
        this.config = config;
        this.binaryServer = new BinaryServer.BinaryServerBuilder().addressPair(new AddressPair(config.getPort())).factory(new MessageFactory().registerMsg(new SubscribeServiceFromServiceCenterConsumerHandler()).registerMsg(new PushServiceToServiceCenterProviderHandler()).registerMsg(new AddScheduleToServiceCenterProviderHandler())).onChannelStateChanged((channel, active) -> {
            if (!active.booleanValue()) {
                this.onDisconnect((Channel)channel);
            }
        }).dispatchMessage((message, handler) -> {
            message.setExtra(this);
            try {
                handler.handle(message);
            }
            catch (Exception e) {
                log.error("handle error", (Throwable)e);
            }
        }).build();
    }

    public ServiceCenterX bind() throws Exception {
        this.binaryServer.startServer();
        return this;
    }

    public ServiceCenterX stop() {
        this.binaryServer.stopServer();
        return this;
    }

    public ServiceCenterXConfig getConfig() {
        return this.config;
    }

    private void onProviderPublicServices(Channel channel, int providerUID, String providerIp, int providerPort, List<String> servicesName) {
        log.info("\u751f\u4ea7\u8005\uff1a" + providerUID + "\uff0cip\uff1a" + channel.remoteAddress() + "\u5f00\u59cb\u53d1\u5e03\u670d\u52a1...");
        for (String serviceName : servicesName) {
            Set<Integer> putIfAbsent;
            Set<Integer> rpcServiceLBData = this.service2Providers.get(serviceName);
            if (rpcServiceLBData == null && (putIfAbsent = this.service2Providers.putIfAbsent(serviceName, rpcServiceLBData = new ConcurrentHashSet<Integer>())) != null) {
                rpcServiceLBData = putIfAbsent;
            }
            rpcServiceLBData.add(providerUID);
            log.info("\u751f\u4ea7\u8005\uff1a" + providerUID + "\uff0cip\uff1a" + channel.remoteAddress() + "\u53d1\u5e03\u670d\u52a1\uff1a" + serviceName);
        }
        this.registerServerSession(channel, providerIp, providerPort, providerUID);
        log.info("\u751f\u4ea7\u8005\uff1a" + providerUID + "\uff0cip\uff1a" + channel.remoteAddress() + "\u53d1\u5e03\u670d\u52a1\u5b8c\u6bd5\uff01");
        this.broadcastSingleServiceProviderInfo(providerUID);
    }

    private void broadcastSingleServiceProviderInfo(int providerId) {
        if (this.service2Providers.isEmpty()) {
            return;
        }
        SubscribeServiceResultServiceCenterMessage msg = new SubscribeServiceResultServiceCenterMessage();
        for (Map.Entry<String, Set<Integer>> entry : this.service2Providers.entrySet()) {
            Set<Integer> data = entry.getValue();
            if (!data.contains(providerId)) continue;
            ProviderServiceMeta serviceMeta = new ProviderServiceMeta();
            serviceMeta.serviceName = entry.getKey();
            for (int tpid : data) {
                ServiceXServerSession serviceXServerSession = this.rpcServers.get(tpid);
                if (serviceXServerSession == null) continue;
                ProviderHostMeta hostMeta = new ProviderHostMeta();
                hostMeta.setIp(serviceXServerSession.getServerIp());
                hostMeta.setPort(serviceXServerSession.getServerPort());
                hostMeta.setProviderId(serviceXServerSession.getProviderId());
                serviceMeta.hostInfos.add(hostMeta);
            }
            msg.services.add(serviceMeta);
        }
        for (ServiceXClientSession session : this.rpcClients.values()) {
            try {
                this.binaryServer.sendMessage(session.getSession(), (Message)msg);
            }
            catch (MessageCodecException e) {
                log.error("send error", (Throwable)e);
            }
            log.info("\u670d\u52a1\u4e2d\u5fc3\u5e7f\u64ad[" + providerId + "]\u7684\u670d\u52a1\u5230" + session.getSession().remoteAddress());
        }
    }

    private void sendAllServiceProviderInfo2Consumer(Channel channel) {
        if (this.service2Providers.isEmpty()) {
            return;
        }
        SubscribeServiceResultServiceCenterMessage msg = new SubscribeServiceResultServiceCenterMessage();
        for (Map.Entry<String, Set<Integer>> entry : this.service2Providers.entrySet()) {
            String serviceName = entry.getKey();
            Set<Integer> data = entry.getValue();
            ProviderServiceMeta info = new ProviderServiceMeta();
            msg.services.add(info);
            info.serviceName = serviceName;
            for (int providerId : data) {
                ServiceXServerSession serviceXServerSession = this.rpcServers.get(providerId);
                if (serviceXServerSession == null) continue;
                ProviderHostMeta hostMeta = new ProviderHostMeta();
                hostMeta.setIp(serviceXServerSession.getServerIp());
                hostMeta.setPort(serviceXServerSession.getServerPort());
                hostMeta.setProviderId(serviceXServerSession.getProviderId());
                info.hostInfos.add(hostMeta);
            }
        }
        channel.writeAndFlush((Object)msg);
    }

    private void onProviderDisconnected(int providerId) {
        block0: for (Map.Entry<String, Set<Integer>> entry : this.service2Providers.entrySet()) {
            String serviceName = entry.getKey();
            Set<Integer> data = entry.getValue();
            Iterator<Integer> iterator = data.iterator();
            while (iterator.hasNext()) {
                Integer pid = iterator.next();
                if (pid != providerId) continue;
                iterator.remove();
                log.info("\u5220\u9664\u63d0\u4f9b\u8005" + providerId + "\uff0c\u7684\u670d\u52a1\uff1a" + serviceName);
                continue block0;
            }
        }
    }

    private ServiceXServerSession registerServerSession(Channel channel, String rpcServerIp, int rpcServerPort, int providerId) {
        if (this.rpcServers.containsKey(providerId)) {
            channel.close();
            log.error("\u670d\u52a1\u63d0\u4f9b\u8005ID\u91cd\u590d\uff0c\u65ad\u5f00\u94fe\u63a5\uff0cIP\uff1a" + channel.remoteAddress() + "\uff0c\u670d\u52a1\u8005ID\uff1a" + providerId);
            return null;
        }
        ServiceXServerSession session = new ServiceXServerSession();
        session.setSession(channel);
        session.setProviderId(providerId);
        session.setServerIp(rpcServerIp);
        session.setServerPort(rpcServerPort);
        this.rpcServers.put(session.getProviderId(), session);
        log.info("RPC\u751f\u4ea7\u8005[" + providerId + "]\u6ce8\u518c\u5230\u670d\u52a1\u4e2d\u5fc3\uff1a" + channel.remoteAddress() + "\uff0c\u751f\u4ea7\u8005\u4e2d\u5fc3\u5927\u5c0f\uff1a" + this.rpcServers.size());
        return session;
    }

    private ServiceXClientSession registerClientSession(Channel channel) {
        ServiceXClientSession session = new ServiceXClientSession();
        session.setSession(channel);
        this.rpcClients.put(channel.id().asLongText(), session);
        log.info("RPC\u6d88\u8d39\u8005\u6ce8\u518c\u5230\u670d\u52a1\u4e2d\u5fc3\uff1a" + channel.remoteAddress() + "\uff0c\u6d88\u8d39\u8005\u4e2d\u5fc3\u5927\u5c0f\uff1a" + this.rpcClients.size());
        return session;
    }

    private void onDisconnect(Channel channel) {
        String asLongText = channel.id().asLongText();
        Iterator<ServiceXServerSession> serversIt = this.rpcServers.values().iterator();
        int providerId = 0;
        while (serversIt.hasNext()) {
            ServiceXServerSession next = serversIt.next();
            if (!next.getSession().id().asLongText().equals(asLongText)) continue;
            serversIt.remove();
            providerId = next.getProviderId();
            log.info("RPC\u751f\u4ea7\u8005\uff0cproviderId" + providerId + "\uff0cIP\uff1a" + channel.remoteAddress() + "\u65ad\u5f00\u94fe\u63a5\uff0c\u5f53\u524dProviderSize\uff1a" + this.rpcServers.size());
            this.onProviderDisconnected(next.getProviderId());
            NoticeProviderDisconnectedServiceCenterMessage msg = new NoticeProviderDisconnectedServiceCenterMessage();
            msg.setProviderUID(next.getProviderId());
            for (ServiceXClientSession session : this.rpcClients.values()) {
                log.info("\u901a\u77e5\u5ba2\u6237\u7aef\uff1a" + session.getSession().remoteAddress() + "\uff0cProvider:" + providerId + "\uff0c" + next.getServerIp() + ":" + next.getServerPort() + "\u65ad\u5f00\u94fe\u63a5\uff01");
                session.getSession().writeAndFlush((Object)msg);
            }
        }
        Iterator<ServiceXClientSession> clientsIt = this.rpcClients.values().iterator();
        while (clientsIt.hasNext()) {
            if (!clientsIt.next().getSession().id().asLongText().equals(asLongText)) continue;
            log.info("\u5220\u9664session\uff1a" + asLongText + "\uff0c\u5f53\u524dClientSize\uff1a" + this.rpcClients.size());
            clientsIt.remove();
            break;
        }
        Iterator<Map.Entry<String, Set<Integer>>> iterator = this.schedules.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Set<Integer>> next = iterator.next();
            String scheduleName = next.getKey();
            Set<Integer> providers = next.getValue();
            if (providers.contains(providerId)) {
                providers.remove(providerId);
                log.info("\u65ad\u5f00\u94fe\u63a5\uff0c\u5220\u9664\u5b9a\u65f6\u670d\u52a1" + scheduleName + "\u7684ProviderId\uff1a" + providerId + "\uff0c\u5269\u4f59\u6267\u884c\u8005\u5927\u5c0f\uff1a" + providers.size());
            }
            if (!providers.isEmpty()) continue;
            iterator.remove();
            try {
                if (!SchedulerUtil.self().removeSchedule(scheduleName)) continue;
                log.info("\u5220\u9664\u5b9a\u65f6\u670d\u52a1\uff1a" + scheduleName + "\uff0c\u56e0\u4e3a\u6ca1\u6709\u6267\u884c\u4efb\u52a1\u7684provider");
            }
            catch (SchedulerException e) {
                log.error("delete error", (Throwable)e);
            }
        }
    }

    private void onAddSchedule(String jobName, int providerId, String cronExpression, int intervalInHours, int intervalInMinutes, int intervalInSeconds, int intervalInMillis, int repeatCount) {
        Set<Integer> putIfAbsent;
        Set<Integer> set = this.schedules.get(jobName);
        if (set == null && (putIfAbsent = this.schedules.putIfAbsent(jobName, set = new ConcurrentHashSet<Integer>())) != null) {
            set = putIfAbsent;
        }
        if (set.contains(providerId)) {
            log.error("\u4efb\u52a1" + jobName + "\u7684Provider" + providerId + "\u5b9a\u65f6\u4efb\u52a1\u53d1\u5e03\u91cd\u590d\uff01");
            return;
        }
        set.add(providerId);
        log.info("Provider" + providerId + "\u6ce8\u518c\u4e86\u4e00\u4e2a\u5b9a\u65f6\u4efb\u52a1:" + jobName);
        try {
            if (SchedulerUtil.self().hasSchedule(jobName)) {
                return;
            }
        }
        catch (SchedulerException e) {
            log.error("check error", (Throwable)e);
        }
        JobDataMap map = new JobDataMap();
        map.put(ScheduleTask.RPCSERVERS, this.rpcServers);
        map.put(ScheduleTask.SCHEDULES, this.schedules);
        if (!StringUtil.isEmptyOrNull(cronExpression)) {
            try {
                Trigger addSchedule = SchedulerUtil.self().addSchedule(jobName, ScheduleTask.class, cronExpression, map);
                log.info("\u521d\u59cb\u5316\u5b9a\u65f6\u4efb\u52a1\uff0c\u540d\u79f0\uff1a" + jobName + "\u8868\u8fbe\u5f0f\uff1a" + cronExpression + "\uff0c\u4e0b\u6b21\u6267\u884c\u65f6\u95f4\uff1a" + TimeUtil.date2Str(addSchedule.getNextFireTime().getTime()));
            }
            catch (SchedulerException e) {
                log.error("init error", (Throwable)e);
            }
        } else {
            try {
                Trigger addSchedule = SchedulerUtil.self().addSchedule(jobName, ScheduleTask.class, intervalInHours, intervalInMinutes, intervalInSeconds, intervalInMillis, repeatCount, map);
                log.info("\u521d\u59cb\u5316\u5b9a\u65f6\u4efb\u52a1\uff0c\u540d\u79f0\uff1a" + jobName + "\uff0c\u65f6\uff1a" + intervalInHours + "\uff0c\u5206\uff1a" + intervalInMinutes + "\uff0c\u79d2\uff1a" + intervalInSeconds + "\uff0c\u6beb\u79d2\uff1a" + intervalInMillis + "\uff0c\u91cd\u590d\u6b21\u6570\uff1a" + repeatCount + "\uff0c\u4e0b\u6b21\u6267\u884c\u65f6\u95f4\uff1a" + TimeUtil.date2Str(addSchedule.getNextFireTime().getTime()));
            }
            catch (SchedulerException e) {
                log.error("init error", (Throwable)e);
            }
        }
    }

    public class AddScheduleToServiceCenterProviderHandler
    implements IHandler<AddScheduleToServiceCenterProviderMessage> {
        @Override
        public void handle(AddScheduleToServiceCenterProviderMessage msg) {
            ((ServiceCenterX)msg.getExtra()).onAddSchedule(msg.jobName, msg.providerId, msg.cronExpression, msg.intervalInHours, msg.intervalInMinutes, msg.intervalInSeconds, msg.intervalInMillis, msg.repeatCount);
        }
    }

    public class PushServiceToServiceCenterProviderHandler
    implements IHandler<PushServiceToServiceCenterProviderMessage> {
        @Override
        public void handle(PushServiceToServiceCenterProviderMessage msg) {
            ((ServiceCenterX)msg.getExtra()).onProviderPublicServices(msg.getChannel(), msg.providerUID, msg.myIp, msg.myPort, msg.services);
        }
    }

    public class SubscribeServiceFromServiceCenterConsumerHandler
    implements IHandler<SubscribeServiceFromServiceCenterConsumerMessage> {
        @Override
        public void handle(SubscribeServiceFromServiceCenterConsumerMessage msg) {
            ServiceCenterX.this.registerClientSession(msg.getChannel());
            ((ServiceCenterX)msg.getExtra()).sendAllServiceProviderInfo2Consumer(msg.getChannel());
        }
    }
}

