/*
 * Decompiled with CFR 0.152.
 */
package org.correomqtt.business.mqtt;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientSslConfigBuilder;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3ReturnCode;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3WillPublishBuilder;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck;
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.net.ssl.SSLException;
import org.correomqtt.business.exception.CorreoMqtt3SubscriptionFailed;
import org.correomqtt.business.exception.CorreoMqttConnectionFailedException;
import org.correomqtt.business.exception.CorreoMqttNotConnectedException;
import org.correomqtt.business.model.ConnectionConfigDTO;
import org.correomqtt.business.model.Lwt;
import org.correomqtt.business.model.MessageDTO;
import org.correomqtt.business.model.SubscriptionDTO;
import org.correomqtt.business.model.TlsSsl;
import org.correomqtt.business.mqtt.BaseCorreoMqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CorreoMqtt3Client
extends BaseCorreoMqttClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(CorreoMqtt3Client.class);
    private Mqtt3BlockingClient mqtt3BlockingClient;

    CorreoMqtt3Client(ConnectionConfigDTO configDTO) {
        super(configDTO);
    }

    @Override
    Logger getLogger() {
        return LOGGER;
    }

    @Override
    void executeConnect() throws SSLException, InterruptedException, ExecutionException, TimeoutException {
        Mqtt3ConnAck connAck;
        ConnectionConfigDTO configDTO = this.getConfigDTO();
        Mqtt3ClientBuilder clientBuilder = (Mqtt3ClientBuilder)((Mqtt3ClientBuilder)((Mqtt3ClientBuilder)MqttClient.builder().useMqttVersion3().identifier(configDTO.getClientId())).serverHost(configDTO.getUrl())).serverPort(this.getDestinationPort());
        if (configDTO.getSsl().equals(TlsSsl.KEYSTORE) && configDTO.getSslKeystore() != null && !configDTO.getSslKeystore().isEmpty()) {
            clientBuilder = (Mqtt3ClientBuilder)((MqttClientSslConfigBuilder.Nested)((MqttClientSslConfigBuilder.Nested)clientBuilder.sslConfig().keyManagerFactory(this.getKeyManagerFactory())).trustManagerFactory(this.getTrustManagerFactory())).applySslConfig();
        }
        clientBuilder.addDisconnectedListener((MqttClientDisconnectedListener)this);
        clientBuilder.addConnectedListener((MqttClientConnectedListener)this);
        this.mqtt3BlockingClient = clientBuilder.buildBlocking();
        Mqtt3ConnectBuilder.Send connBuilder = (Mqtt3ConnectBuilder.Send)((Mqtt3ConnectBuilder.Send)this.mqtt3BlockingClient.toAsync().connectWith().cleanSession(configDTO.isCleanSession())).keepAlive(10);
        if (configDTO.getLwt().equals(Lwt.ON)) {
            ((Mqtt3WillPublishBuilder.Nested.Complete)((Mqtt3WillPublishBuilder.Nested.Complete)((Mqtt3WillPublishBuilder.Nested.Complete)((Mqtt3WillPublishBuilder.Nested.Complete)connBuilder.willPublish().topic(configDTO.getLwtTopic())).qos(configDTO.getLwtQoS().getMqttQos())).payload(configDTO.getLwtPayload().getBytes())).retain(configDTO.isLwtRetained())).applyWillPublish();
        }
        if (configDTO.getUsername() != null && configDTO.getPassword() != null && !configDTO.getUsername().isEmpty() && !configDTO.getPassword().isEmpty()) {
            ((Mqtt3SimpleAuthBuilder.Nested.Complete)((Mqtt3SimpleAuthBuilder.Nested.Complete)connBuilder.simpleAuth().username(configDTO.getUsername())).password(configDTO.getPassword().getBytes())).applySimpleAuth();
        }
        if ((connAck = (Mqtt3ConnAck)((CompletableFuture)connBuilder.send()).get(10L, TimeUnit.SECONDS)).getReturnCode().isError()) {
            this.closeIfConnectionExists();
            throw new CorreoMqttConnectionFailedException(connAck.getReturnCode());
        }
    }

    @Override
    void doReconnect(MqttClientDisconnectedContext context) {
        context.getReconnector().reconnect(true).delay(3000L, TimeUnit.MILLISECONDS);
    }

    private synchronized void closeIfConnectionExists() {
        if (this.mqtt3BlockingClient != null && this.mqtt3BlockingClient.getState().isConnectedOrReconnect()) {
            this.mqtt3BlockingClient.disconnect();
        }
    }

    @Override
    void doUnsubscribe(SubscriptionDTO subscriptionDTO) {
        ((Mqtt3UnsubscribeBuilder.SendVoid.Complete)this.getCheckedClient().unsubscribeWith().topicFilter(subscriptionDTO.getTopic())).send();
    }

    @Override
    void doPublish(MessageDTO messageDTO) throws InterruptedException, ExecutionException, TimeoutException {
        messageDTO.setDateTime(LocalDateTime.now(ZoneOffset.UTC));
        ((CompletableFuture)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)this.getCheckedAsyncClient().publishWith().topic(messageDTO.getTopic())).payload(messageDTO.getPayload().getBytes())).qos(messageDTO.getQos().getMqttQos())).retain(messageDTO.isRetained())).send()).get(10L, TimeUnit.SECONDS);
    }

    @Override
    void doSubscribe(SubscriptionDTO subscriptionDTO, Consumer<MessageDTO> incomingCallback) throws InterruptedException, ExecutionException, TimeoutException {
        Mqtt3SubAck subAck = (Mqtt3SubAck)((Mqtt3AsyncClient.Mqtt3SubscribeAndCallbackBuilder.Start.Complete)((Mqtt3AsyncClient.Mqtt3SubscribeAndCallbackBuilder.Start.Complete)this.getCheckedAsyncClient().subscribeWith().topicFilter(subscriptionDTO.getTopic())).qos(subscriptionDTO.getQos().getMqttQos())).callback(mqtt3Publish -> incomingCallback.accept(new MessageDTO((Mqtt3Publish)mqtt3Publish))).send().get(10L, TimeUnit.SECONDS);
        List returnCodes = subAck.getReturnCodes();
        if (returnCodes.stream().anyMatch(Mqtt3ReturnCode::isError)) {
            throw new CorreoMqtt3SubscriptionFailed(returnCodes);
        }
    }

    @Override
    void doDisconnect() {
        this.getCheckedClient().disconnect();
    }

    @Override
    boolean isConnected() {
        if (this.mqtt3BlockingClient == null) {
            return false;
        }
        return this.mqtt3BlockingClient.getState().isConnected();
    }

    private Mqtt3AsyncClient getCheckedAsyncClient() {
        return this.getCheckedClient().toAsync();
    }

    private Mqtt3BlockingClient getCheckedClient() {
        if (this.mqtt3BlockingClient == null) {
            throw new CorreoMqttNotConnectedException();
        }
        return this.mqtt3BlockingClient;
    }

    public Mqtt3BlockingClient getMqtt3BlockingClient() {
        return this.mqtt3BlockingClient;
    }
}

