/*
 * Decompiled with CFR 0.152.
 */
package org.zstacks.zbus.client.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.client.Broker;
import org.zstacks.zbus.client.ClientHint;
import org.zstacks.zbus.client.ZbusException;
import org.zstacks.zbus.client.broker.HaBrokerConfig;
import org.zstacks.zbus.client.broker.SingleBroker;
import org.zstacks.zbus.client.broker.TrackAgent;
import org.zstacks.zbus.client.broker.TrackListener;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.MqInfo;
import org.zstacks.zbus.protocol.TrackTable;
import org.zstacks.znet.Helper;
import org.zstacks.znet.Message;
import org.zstacks.znet.RemotingClient;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.ticket.ResultCallback;

public class HaBroker
implements Broker,
TrackListener {
    private static final Logger log = LoggerFactory.getLogger(HaBroker.class);
    private final String requestIp = Helper.getLocalIp();
    private volatile TrackTable trackTable = new TrackTable();
    private String trackAddressList;
    private HaBrokerConfig config;
    public TrackAgent trackAgent;
    private Dispatcher dispatcher = null;
    private boolean ownDispatcher = false;
    private Map<String, SingleBroker> brokers = new ConcurrentHashMap<String, SingleBroker>();

    public HaBroker(HaBrokerConfig config) throws IOException {
        this.config = config;
        this.trackAddressList = config.getTrackAddrList();
        if (config.getDispatcher() == null) {
            this.ownDispatcher = true;
            this.dispatcher = new Dispatcher();
            this.config.setDispatcher(this.dispatcher);
        } else {
            this.dispatcher = config.getDispatcher();
            this.ownDispatcher = false;
        }
        this.dispatcher.start();
        this.trackAgent = new TrackAgent(this.trackAddressList, this.dispatcher);
        this.trackAgent.addTrackListener(this);
        this.trackAgent.waitForReady(3000L);
    }

    private SingleBroker getBrokerByAddress(String address) {
        return this.brokers.get(address);
    }

    @Override
    public void onTrackTableUpdated(TrackTable trackTable) {
        this.trackTable = trackTable;
        for (String brokerAddress : trackTable.brokerAddresses()) {
            SingleBroker broker = this.brokers.get(brokerAddress);
            if (broker != null) continue;
            HaBrokerConfig singleConfig = this.config.clone();
            singleConfig.setBrokerAddress(brokerAddress);
            try {
                broker = new SingleBroker(singleConfig);
                this.brokers.put(brokerAddress, broker);
            }
            catch (IOException e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        Iterator<Map.Entry<String, SingleBroker>> iter = this.brokers.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, SingleBroker> entry = iter.next();
            String brokerAddress = entry.getKey();
            SingleBroker broker = entry.getValue();
            if (this.trackTable.brokerAddresses().contains(brokerAddress)) continue;
            try {
                broker.close();
            }
            catch (IOException e) {
                log.error(e.getMessage(), (Throwable)e);
            }
            iter.remove();
        }
    }

    @Override
    public void close() throws IOException {
        for (SingleBroker broker : this.brokers.values()) {
            broker.close();
        }
        if (this.ownDispatcher && this.dispatcher != null) {
            try {
                this.dispatcher.close();
            }
            catch (IOException e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        try {
            this.trackAgent.close();
        }
        catch (IOException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
    }

    private boolean isAdmin(Message msg) {
        return "admin".equals(msg.getCommand());
    }

    private void invokeAsyncByBroker(String brokerAddress, Message msg, ResultCallback callback) throws IOException {
        SingleBroker broker = this.getBrokerByAddress(brokerAddress);
        if (broker == null) {
            String errorMsg = brokerAddress + " zbus broker missing";
            log.error(errorMsg);
            throw new ZbusException(errorMsg);
        }
        broker.invokeAsync(msg, callback);
    }

    private Message invokeSyncByBroker(String brokerAddress, Message msg, int timeout) throws IOException {
        SingleBroker broker = this.getBrokerByAddress(brokerAddress);
        if (broker == null) {
            String errorMsg = brokerAddress + " zbus broker missing";
            log.error(errorMsg);
            throw new ZbusException(errorMsg);
        }
        return broker.invokeSync(msg, timeout);
    }

    private String getBrokerAddressForAdmin(String mq) {
        List<MqInfo> mqInfos = null;
        if (mq != null) {
            mqInfos = this.trackTable.getMqInfo(mq);
        }
        String brokerAddress = null;
        if (mqInfos == null || mqInfos.size() == 0) {
            if (this.trackTable.brokerAddresses().size() == 0) {
                throw new ZbusException("ZbusServer not exists");
            }
            brokerAddress = this.trackTable.brokerAddresses().iterator().next();
        } else {
            brokerAddress = mqInfos.get(0).getBroker();
        }
        return brokerAddress;
    }

    @Override
    public void invokeAsync(Message msg, ResultCallback callback) throws IOException {
        if (this.isAdmin(msg)) {
            String brokerAddress = this.getBrokerAddressForAdmin(msg.getMq());
            this.invokeAsyncByBroker(brokerAddress, msg, callback);
            return;
        }
        String brokerAddress = msg.getBroker();
        if (brokerAddress != null) {
            this.invokeAsyncByBroker(brokerAddress, msg, callback);
            return;
        }
        List<MqInfo> mqInfos = this.trackTable.getMqInfo(msg.getMq());
        if (mqInfos == null || mqInfos.size() == 0) {
            throw new ZbusException("no broker available");
        }
        int mode = mqInfos.get(0).getMode();
        if (MessageMode.isEnabled(mode, MessageMode.PubSub)) {
            for (MqInfo info : mqInfos) {
                brokerAddress = info.getBroker();
                this.invokeAsyncByBroker(brokerAddress, msg, callback);
            }
        }
        brokerAddress = mqInfos.get(0).getBroker();
        this.invokeAsyncByBroker(brokerAddress, msg, callback);
    }

    @Override
    public Message invokeSync(Message msg, int timeout) throws IOException {
        if (this.isAdmin(msg)) {
            String brokerAddress = this.getBrokerAddressForAdmin(msg.getMq());
            return this.invokeSyncByBroker(brokerAddress, msg, timeout);
        }
        String brokerAddress = msg.getBroker();
        if (brokerAddress != null) {
            return this.invokeSyncByBroker(brokerAddress, msg, timeout);
        }
        List<MqInfo> mqInfos = this.trackTable.getMqInfo(msg.getMq());
        if (mqInfos == null || mqInfos.size() == 0) {
            throw new ZbusException("no broker available");
        }
        int mode = mqInfos.get(0).getMode();
        if (MessageMode.isEnabled(mode, MessageMode.PubSub)) {
            Message res = null;
            for (MqInfo info : mqInfos) {
                brokerAddress = info.getBroker();
                res = this.invokeSyncByBroker(brokerAddress, msg, timeout);
            }
            return res;
        }
        brokerAddress = mqInfos.get(0).getBroker();
        return this.invokeSyncByBroker(brokerAddress, msg, timeout);
    }

    private RemotingClient getClientByBroker(String brokerAddress) throws IOException {
        SingleBroker broker = this.getBrokerByAddress(brokerAddress);
        if (broker == null) {
            String errorMsg = brokerAddress + " zbus broker missing";
            log.error(errorMsg);
            throw new IOException(errorMsg);
        }
        ClientHint hint = new ClientHint();
        hint.setBroker(brokerAddress);
        return broker.getClient(hint);
    }

    @Override
    public RemotingClient getClient(ClientHint hint) throws IOException {
        MqInfo info;
        String brokerAddress = hint.getBroker();
        if (brokerAddress != null) {
            return this.getClientByBroker(brokerAddress);
        }
        String mq = hint.getMq();
        List<MqInfo> mqInfos = null;
        if (mq != null && (mqInfos = this.trackTable.getMqInfo(hint.getMq())) != null && mqInfos.size() > 0 && (info = mqInfos.get(mqInfos.size() - 1)).getUnconsumedMsgCount() > 0L) {
            return this.getClientByBroker(info.getBroker());
        }
        ArrayList<String> list = new ArrayList<String>(this.trackTable.brokerAddresses());
        if (list.size() == 0) {
            throw new IOException("no broker available");
        }
        String requestIp = hint.getRequestIp();
        if (requestIp == null) {
            requestIp = this.requestIp;
        }
        brokerAddress = (String)list.get(Math.abs(requestIp.hashCode()) % list.size());
        return this.getClientByBroker(brokerAddress);
    }

    @Override
    public void closeClient(RemotingClient client) throws IOException {
        String brokerAddress = client.getBrokerAddress();
        SingleBroker broker = this.getBrokerByAddress(brokerAddress);
        if (broker != null) {
            broker.closeClient(client);
        } else {
            log.warn("unable to find client's broker");
            client.close();
        }
    }
}

