/*
 * Decompiled with CFR 0.152.
 */
package org.correomqtt.business.mqtt;

import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientConnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedListener;
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.hivemq.client.util.KeyStoreUtil;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.LocalPortForwarder;
import net.schmizz.sshj.connection.channel.direct.Parameters;
import net.schmizz.sshj.transport.verification.HostKeyVerifier;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import org.correomqtt.business.dispatcher.ConnectionLifecycleDispatcher;
import org.correomqtt.business.exception.CorreoMqttAlreadySubscribedException;
import org.correomqtt.business.exception.CorreoMqttNoRetriesLeftException;
import org.correomqtt.business.exception.CorreoMqttSshFailedException;
import org.correomqtt.business.model.Auth;
import org.correomqtt.business.model.ConnectionConfigDTO;
import org.correomqtt.business.model.MessageDTO;
import org.correomqtt.business.model.Proxy;
import org.correomqtt.business.model.SubscriptionDTO;
import org.correomqtt.business.mqtt.CorreoMqttClient;
import org.slf4j.Logger;
import org.slf4j.MarkerFactory;

abstract class BaseCorreoMqttClient
implements CorreoMqttClient,
MqttClientDisconnectedListener,
MqttClientConnectedListener {
    private static final int MAX_RECONNECTS = 5;
    private final ConnectionConfigDTO configDTO;
    private final AtomicBoolean wasConnectedBefore = new AtomicBoolean(false);
    private final AtomicBoolean tryToReconnect = new AtomicBoolean(false);
    private final AtomicInteger triedReconnects = new AtomicInteger(0);
    private final Set<SubscriptionDTO> subscriptions = new HashSet<SubscriptionDTO>();
    private SSHClient sshClient;
    private LocalPortForwarder localPortforwarder;

    public BaseCorreoMqttClient(ConnectionConfigDTO configDTO) {
        this.configDTO = configDTO;
    }

    ConnectionConfigDTO getConfigDTO() {
        return this.configDTO;
    }

    abstract Logger getLogger();

    @Override
    public Set<SubscriptionDTO> getSubscriptions() {
        return new HashSet<SubscriptionDTO>(this.subscriptions);
    }

    @Override
    public synchronized void connect() throws InterruptedException, ExecutionException, TimeoutException, SSLException {
        if (this.configDTO.getProxy().equals(Proxy.SSH)) {
            try {
                this.setupSsh();
            }
            catch (IOException e) {
                this.getLogger().error(MarkerFactory.getMarker((String)this.configDTO.getName()), "Error while creating ssh connection. ", (Throwable)e);
                this.disconnect(false);
                throw new CorreoMqttSshFailedException(e);
            }
        }
        this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Connecting to Broker using {}", (Object)this.configDTO.getMqttVersion().getDescription());
        this.executeConnect();
        this.wasConnectedBefore.set(true);
    }

    private void setupSsh() throws IOException, InterruptedException {
        this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Creating SSH tunnel to {}:{}.", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
        this.sshClient = new SSHClient();
        this.sshClient.addHostKeyVerifier((HostKeyVerifier)new PromiscuousVerifier());
        this.sshClient.connect(this.configDTO.getSshHost(), this.configDTO.getSshPort());
        if (this.configDTO.getAuth().equals(Auth.PASSWORD)) {
            this.sshClient.authPassword(this.configDTO.getAuthUsername(), this.configDTO.getAuthPassword());
        } else if (this.configDTO.getAuth().equals(Auth.KEYFILE)) {
            this.sshClient.authPublickey(this.configDTO.getAuthUsername(), new String[]{this.configDTO.getAuthKeyfile()});
        }
        Parameters parameters = new Parameters("localhost", this.configDTO.getLocalPort(), this.configDTO.getUrl(), this.configDTO.getPort());
        Thread thread = new Thread(() -> {
            try (ServerSocket serverSocket = new ServerSocket();){
                serverSocket.setReuseAddress(true);
                serverSocket.bind(new InetSocketAddress(parameters.getLocalHost(), parameters.getLocalPort()));
                this.localPortforwarder = this.sshClient.newLocalPortForwarder(parameters, serverSocket);
                this.localPortforwarder.listen();
            }
            catch (Exception e) {
                this.getLogger().error(MarkerFactory.getMarker((String)this.configDTO.getName()), "SSH socket to {}:{} failed.", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
                throw new CorreoMqttSshFailedException(e);
            }
        });
        thread.start();
        int sshRetries = 0;
        while (!this.sshClient.isConnected()) {
            if (sshRetries < 5) {
                ++sshRetries;
                TimeUnit.SECONDS.sleep(1L);
                continue;
            }
            this.getLogger().error(MarkerFactory.getMarker((String)this.configDTO.getName()), "SSH tunnel to {}:{} failed.", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
            return;
        }
        this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "SSH tunnel to {}:{} established.", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
    }

    protected int getDestinationPort() {
        if (this.configDTO.getProxy().equals(Proxy.SSH)) {
            return this.configDTO.getLocalPort();
        }
        return this.configDTO.getPort();
    }

    abstract void executeConnect() throws SSLException, InterruptedException, ExecutionException, TimeoutException;

    public void onDisconnected(MqttClientDisconnectedContext context) {
        if (this.tryToReconnect.get()) {
            if (context.getSource() == MqttDisconnectSource.SERVER) {
                this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnected by {}. Connection to broker lost.", (Object)context.getSource());
                this.reconnect(context);
            } else if (context.getSource() == MqttDisconnectSource.CLIENT) {
                this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnected by {}. Connection to broker not possible.", (Object)context.getSource());
                this.reconnect(context);
            } else if (context.getSource() == MqttDisconnectSource.USER) {
                try {
                    this.localPortforwarder.close();
                    this.sshClient.disconnect();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnected by {}. Connection to broker disconnected by user.", (Object)context.getSource());
            }
        }
    }

    public void onConnected(MqttClientConnectedContext context) {
        this.triedReconnects.set(0);
        this.tryToReconnect.set(true);
        if (this.wasConnectedBefore.get()) {
            ConnectionLifecycleDispatcher.getInstance().onConnectionReconnected(this.configDTO.getId());
            this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Reconnected to broker successfully");
        }
    }

    KeyManagerFactory getKeyManagerFactory() throws SSLException {
        return KeyStoreUtil.keyManagerFromKeystore((File)new File(this.configDTO.getSslKeystore()), (String)this.configDTO.getSslKeystorePassword(), (String)this.configDTO.getSslKeystorePassword());
    }

    TrustManagerFactory getTrustManagerFactory() throws SSLException {
        return KeyStoreUtil.trustManagerFromKeystore((File)new File(this.configDTO.getSslKeystore()), (String)this.configDTO.getSslKeystorePassword());
    }

    @Override
    public synchronized void unsubscribe(SubscriptionDTO subscriptionDTO) {
        this.doUnsubscribe(subscriptionDTO);
        this.subscriptions.remove(subscriptionDTO);
    }

    abstract void doUnsubscribe(SubscriptionDTO var1);

    @Override
    public synchronized void publish(MessageDTO messageDTO) throws InterruptedException, ExecutionException, TimeoutException {
        this.doPublish(messageDTO);
    }

    abstract void doPublish(MessageDTO var1) throws InterruptedException, ExecutionException, TimeoutException;

    abstract void doSubscribe(SubscriptionDTO var1, Consumer<MessageDTO> var2) throws InterruptedException, ExecutionException, TimeoutException;

    @Override
    public synchronized void subscribe(SubscriptionDTO subscriptionDTO, Consumer<MessageDTO> incomingCallback) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.subscriptions.contains(subscriptionDTO)) {
            throw new CorreoMqttAlreadySubscribedException(this.getConfigDTO().getId(), subscriptionDTO);
        }
        this.doSubscribe(subscriptionDTO, incomingCallback);
        this.subscriptions.add(subscriptionDTO);
    }

    private synchronized void reconnect(MqttClientDisconnectedContext context) {
        this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Reconnecting connect to Broker.");
        if (this.tryToReconnect.get() && this.triedReconnects.get() < 5 && context.getSource() != MqttDisconnectSource.USER) {
            this.doReconnect(context);
            ConnectionLifecycleDispatcher.getInstance().onReconnectFailed(this.configDTO.getId(), this.triedReconnects, 5);
            this.triedReconnects.incrementAndGet();
        } else {
            this.getLogger().error(MarkerFactory.getMarker((String)this.configDTO.getName()), "Maximum number of reconnects reached.");
            ConnectionLifecycleDispatcher.getInstance().onConnectionFailed(this.configDTO.getId(), new CorreoMqttNoRetriesLeftException());
        }
    }

    abstract void doReconnect(MqttClientDisconnectedContext var1);

    @Override
    public synchronized void disconnect(boolean graceful) {
        this.tryToReconnect.set(false);
        if (this.isConnected()) {
            this.doDisconnect();
            this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnected from broker.");
        } else {
            this.getLogger().info("Disconnecting client was not possible, cause was not connected.");
        }
        if (this.sshClient != null && this.sshClient.isConnected()) {
            this.getLogger().debug(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnecting SSH tunnel for {}:{}.", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
            try {
                this.sshClient.close();
                this.getLogger().info(MarkerFactory.getMarker((String)this.configDTO.getName()), "SSH tunnel for {}:{} closed", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
            }
            catch (IOException e) {
                this.getLogger().error(MarkerFactory.getMarker((String)this.configDTO.getName()), "Disconnecting SSH tunnel for {}:{} failed", (Object)this.configDTO.getSshHost(), (Object)this.configDTO.getPort());
            }
            this.sshClient = null;
        }
    }

    abstract void doDisconnect();

    abstract boolean isConnected();
}

