/*
 * Decompiled with CFR 0.152.
 */
package org.correomqtt.business.mqtt;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientSslConfigBuilder;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5ReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuthBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublishBuilder;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5UnsubscribeBuilder;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.correomqtt.business.exception.CorreoMqtt5SubscriptionFailed;
import org.correomqtt.business.exception.CorreoMqttConnectionFailedException;
import org.correomqtt.business.exception.CorreoMqttNotConnectedException;
import org.correomqtt.business.model.ConnectionConfigDTO;
import org.correomqtt.business.model.Lwt;
import org.correomqtt.business.model.MessageDTO;
import org.correomqtt.business.model.SubscriptionDTO;
import org.correomqtt.business.model.TlsSsl;
import org.correomqtt.business.mqtt.BaseCorreoMqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CorreoMqtt5Client
extends BaseCorreoMqttClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(CorreoMqtt5Client.class);
    private Mqtt5BlockingClient mqtt5BlockingClient;

    CorreoMqtt5Client(ConnectionConfigDTO configDTO) {
        super(configDTO);
    }

    @Override
    Logger getLogger() {
        return LOGGER;
    }

    @Override
    void executeConnect() throws SSLException, InterruptedException, ExecutionException, TimeoutException {
        Mqtt5ConnAck connAck;
        ConnectionConfigDTO configDTO = this.getConfigDTO();
        Mqtt5ClientBuilder clientBuilder = (Mqtt5ClientBuilder)((Mqtt5ClientBuilder)((Mqtt5ClientBuilder)MqttClient.builder().useMqttVersion5().identifier(configDTO.getClientId())).serverHost(configDTO.getUrl())).serverPort(this.getDestinationPort());
        if (configDTO.getSsl().equals(TlsSsl.KEYSTORE) && configDTO.getSslKeystore() != null && !configDTO.getSslKeystore().isEmpty()) {
            clientBuilder = (Mqtt5ClientBuilder)((MqttClientSslConfigBuilder.Nested)((MqttClientSslConfigBuilder.Nested)clientBuilder.sslConfig().keyManagerFactory(this.getKeyManagerFactory())).trustManagerFactory(this.getTrustManagerFactory())).applySslConfig();
        }
        clientBuilder.addDisconnectedListener((MqttClientDisconnectedListener)this);
        clientBuilder.addConnectedListener((MqttClientConnectedListener)this);
        this.mqtt5BlockingClient = clientBuilder.buildBlocking();
        Mqtt5ConnectBuilder.Send connBuilder = (Mqtt5ConnectBuilder.Send)((Mqtt5ConnectBuilder.Send)this.mqtt5BlockingClient.toAsync().connectWith().cleanStart(configDTO.isCleanSession())).keepAlive(10000);
        if (configDTO.getLwt().equals(Lwt.ON)) {
            ((Mqtt5WillPublishBuilder.Nested.Complete)((Mqtt5WillPublishBuilder.Nested.Complete)((Mqtt5WillPublishBuilder.Nested.Complete)((Mqtt5WillPublishBuilder.Nested.Complete)connBuilder.willPublish().topic(configDTO.getLwtTopic())).qos(configDTO.getLwtQoS().getMqttQos())).payload(configDTO.getLwtPayload().getBytes())).retain(configDTO.isLwtRetained())).applyWillPublish();
        }
        if (configDTO.getUsername() != null && configDTO.getPassword() != null && !configDTO.getUsername().isEmpty() && !configDTO.getPassword().isEmpty()) {
            ((Mqtt5SimpleAuthBuilder.Nested.Complete)((Mqtt5SimpleAuthBuilder.Nested.Complete)connBuilder.simpleAuth().username(configDTO.getUsername())).password(configDTO.getPassword().getBytes())).applySimpleAuth();
        }
        if ((connAck = (Mqtt5ConnAck)((CompletableFuture)connBuilder.send()).get(30L, TimeUnit.SECONDS)).getReasonCode().isError()) {
            this.closeIfConnectionExists();
            throw new CorreoMqttConnectionFailedException(connAck.getReasonCode());
        }
    }

    @Override
    void doReconnect(MqttClientDisconnectedContext context) {
        context.getReconnector().reconnect(true).delay(3000L, TimeUnit.MILLISECONDS);
    }

    private synchronized void closeIfConnectionExists() {
        if (this.mqtt5BlockingClient != null && this.mqtt5BlockingClient.getState().isConnectedOrReconnect()) {
            this.mqtt5BlockingClient.disconnect();
        }
    }

    @Override
    void doUnsubscribe(SubscriptionDTO subscriptionDTO) {
        ((Mqtt5UnsubscribeBuilder.Send.Complete)this.getCheckedClient().unsubscribeWith().topicFilter(subscriptionDTO.getTopic())).send();
    }

    @Override
    void doPublish(MessageDTO messageDTO) {
        messageDTO.setDateTime(LocalDateTime.now(ZoneOffset.UTC));
        ((Mqtt5PublishBuilder.Send.Complete)((Mqtt5PublishBuilder.Send.Complete)((Mqtt5PublishBuilder.Send.Complete)((Mqtt5PublishBuilder.Send.Complete)this.getCheckedClient().publishWith().topic(messageDTO.getTopic())).payload(messageDTO.getPayload().getBytes())).qos(messageDTO.getQos().getMqttQos())).retain(messageDTO.isRetained())).send();
    }

    @Override
    void doSubscribe(SubscriptionDTO subscriptionDTO, Consumer<MessageDTO> incomingCallback) throws InterruptedException, ExecutionException, TimeoutException {
        Mqtt5SubAck subAck = (Mqtt5SubAck)((Mqtt5AsyncClient.Mqtt5SubscribeAndCallbackBuilder.Start.Complete)((Mqtt5AsyncClient.Mqtt5SubscribeAndCallbackBuilder.Start.Complete)this.getCheckedAsyncClient().subscribeWith().topicFilter(subscriptionDTO.getTopic())).qos(subscriptionDTO.getQos().getMqttQos())).callback(mqtt5Publish -> incomingCallback.accept(new MessageDTO((Mqtt5Publish)mqtt5Publish))).send().get(10L, TimeUnit.SECONDS);
        List returnCodes = subAck.getReasonCodes();
        if (returnCodes.stream().anyMatch(Mqtt5ReasonCode::isError)) {
            throw new CorreoMqtt5SubscriptionFailed(returnCodes);
        }
    }

    @Override
    void doDisconnect() {
        this.getCheckedClient().disconnect();
    }

    @Override
    boolean isConnected() {
        return this.getCheckedClient().getState().isConnected();
    }

    private Mqtt5AsyncClient getCheckedAsyncClient() {
        return this.getCheckedClient().toAsync();
    }

    private Mqtt5BlockingClient getCheckedClient() {
        if (this.mqtt5BlockingClient == null) {
            throw new CorreoMqttNotConnectedException();
        }
        return this.mqtt5BlockingClient;
    }

    public Mqtt5BlockingClient getMqtt5BlockingClient() {
        return this.mqtt5BlockingClient;
    }
}

