/*
 * Decompiled with CFR 0.152.
 */
package me.hekr.iotos.softgateway.network.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import me.hekr.iotos.softgateway.network.mqtt.ConnectionContext;
import me.hekr.iotos.softgateway.network.mqtt.MqttConnections;
import me.hekr.iotos.softgateway.network.mqtt.PacketCoder;
import me.hekr.iotos.softgateway.network.mqtt.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttServer<T> {
    private static final Logger log = LoggerFactory.getLogger(MqttServer.class);
    private final int poolSize = Runtime.getRuntime().availableProcessors() * 4;
    PacketCoder<T> packetCoder;
    private Vertx vertx;
    private io.vertx.mqtt.MqttServer server;
    private int port;
    private Listener<T> listener;

    public MqttServer(int port) {
        this.port = port;
    }

    public MqttServer() {
        this(1883);
    }

    public void start() {
        Objects.requireNonNull(this.packetCoder, "packetCoder \u5fc5\u586b");
        Objects.requireNonNull(this.listener, "listener \u5fc5\u586b");
        VertxOptions options = new VertxOptions();
        options.setEventLoopPoolSize(this.poolSize);
        options.setWorkerPoolSize(this.poolSize);
        this.vertx = Vertx.vertx((VertxOptions)options);
        this.server = io.vertx.mqtt.MqttServer.create((Vertx)this.vertx);
        this.server.exceptionHandler(t -> log.error(t.getMessage(), t));
        this.server.endpointHandler(endpoint -> {
            ConnectionContext<T> context = MqttConnections.add(endpoint);
            context.setPacketCoder(this.packetCoder);
            endpoint.autoKeepAlive(true);
            endpoint.publishAutoAck(true);
            this.handleClose((MqttEndpoint)endpoint);
            this.handlePublishMessage((MqttEndpoint)endpoint);
            this.handleAuth((MqttEndpoint)endpoint);
            this.handleException((MqttEndpoint)endpoint);
            this.handleSubscribe((MqttEndpoint)endpoint);
        });
        this.server.listen(this.port).onComplete(ar -> {
            if (ar.succeeded()) {
                log.info("mqtt server \u542f\u52a8\u6210\u529f\uff0c\u7ed1\u5b9a\u7aef\u53e3\uff1a{}", (Object)this.server.actualPort());
            } else {
                log.error("mqtt server \u542f\u52a8\u5931\u8d25\uff0c" + ar.cause().getMessage(), ar.cause());
            }
        });
    }

    private void handleSubscribe(MqttEndpoint endpoint) {
        endpoint.subscribeHandler(m -> {
            ConnectionContext context = MqttConnections.get(endpoint);
            List<MqttQoS> qosList = this.listener.aclSubTopic(context, m.topicSubscriptions());
            if (log.isDebugEnabled()) {
                for (int i = 0; i < m.topicSubscriptions().size(); ++i) {
                    MqttTopicSubscription topic = (MqttTopicSubscription)m.topicSubscriptions().get(i);
                    MqttQoS qos = qosList.get(i);
                    log.debug((qos == MqttQoS.FAILURE ? "\u4e0d\u5141\u8bb8 " : "\u5141\u8bb8 ") + context.getClientId() + " \u8ba2\u9605 " + topic.topicName() + " qos:" + topic.qualityOfService() + " \u534f\u5546 qos:" + qos);
                }
            }
            endpoint.subscribeAcknowledge(m.messageId(), qosList);
        });
    }

    private void handleException(MqttEndpoint endpoint) {
        endpoint.exceptionHandler(t -> log.error(t.getMessage(), t));
    }

    private void handleClose(MqttEndpoint endpoint) {
        endpoint.closeHandler(v -> this.listener.onClose(MqttConnections.remove(endpoint)));
    }

    private void handlePublishMessage(MqttEndpoint endpoint) {
        endpoint.publishHandler(m -> {
            ConnectionContext context = MqttConnections.get(endpoint);
            boolean pass = this.listener.aclPubTopic(context, m.topicName(), m.qosLevel());
            if (!pass && log.isDebugEnabled()) {
                log.debug("publish topic \u62d2\u7edd\uff0c\u4e22\u5f03\u6d88\u606f, clientId: {}, username: {}, topic: {}, qos:{}", new Object[]{context.getClientId(), context.getUsername(), m.topicName(), m.qosLevel()});
                return;
            }
            this.listener.onMessage(context, m.topicName(), m.qosLevel(), this.packetCoder.decode(m.payload().getBytes()));
        });
    }

    private void handleAuth(MqttEndpoint endpoint) {
        MqttAuth auth = endpoint.auth();
        String username = auth == null ? null : auth.getUsername();
        String password = auth == null ? null : auth.getPassword();
        ConnectionContext context = MqttConnections.get(endpoint);
        context.setUsername(username);
        context.setPassword(password);
        boolean authResult = this.listener.auth(context);
        context.setAuth(authResult);
        context.setAuthTime(LocalDateTime.now());
        if (authResult) {
            endpoint.accept(false);
        } else {
            endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
        }
    }

    public void close() {
        if (this.server != null) {
            this.server.close();
        }
        if (this.vertx != null) {
            this.vertx.close();
        }
    }

    public void setPacketCoder(PacketCoder<T> packetCoder) {
        this.packetCoder = packetCoder;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setListener(Listener<T> listener) {
        this.listener = listener;
    }
}

