/*
 * Decompiled with CFR 0.152.
 */
package org.astarteplatform.devicesdk.transport.mqtt;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.zip.InflaterInputStream;
import org.astarteplatform.devicesdk.AstartePropertyStorageException;
import org.astarteplatform.devicesdk.protocol.AstarteAggregateDatastreamEvent;
import org.astarteplatform.devicesdk.protocol.AstarteAggregateDatastreamEventListener;
import org.astarteplatform.devicesdk.protocol.AstarteAggregateDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteDatastreamEvent;
import org.astarteplatform.devicesdk.protocol.AstarteDatastreamEventListener;
import org.astarteplatform.devicesdk.protocol.AstarteDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteDevicePropertyInterface;
import org.astarteplatform.devicesdk.protocol.AstarteInterface;
import org.astarteplatform.devicesdk.protocol.AstarteInterfaceDatastreamMapping;
import org.astarteplatform.devicesdk.protocol.AstarteInterfaceMapping;
import org.astarteplatform.devicesdk.protocol.AstarteInterfaceMappingNotFoundException;
import org.astarteplatform.devicesdk.protocol.AstartePropertyEvent;
import org.astarteplatform.devicesdk.protocol.AstartePropertyEventListener;
import org.astarteplatform.devicesdk.protocol.AstarteProtocolType;
import org.astarteplatform.devicesdk.protocol.AstarteServerAggregateDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteServerDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteServerPropertyInterface;
import org.astarteplatform.devicesdk.transport.AstarteFailedMessage;
import org.astarteplatform.devicesdk.transport.AstarteFailedMessageStorage;
import org.astarteplatform.devicesdk.transport.AstarteTransportException;
import org.astarteplatform.devicesdk.transport.mqtt.AstarteMqttTransport;
import org.astarteplatform.devicesdk.transport.mqtt.MutualSSLAuthenticationMqttConnectionInfo;
import org.bson.BSONCallback;
import org.bson.BSONDecoder;
import org.bson.BSONObject;
import org.bson.BasicBSONCallback;
import org.bson.BasicBSONDecoder;
import org.bson.BsonBinaryWriter;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.DocumentCodec;
import org.bson.codecs.EncoderContext;
import org.bson.io.BasicOutputBuffer;
import org.bson.io.BsonOutput;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.joda.time.DateTime;

public class AstarteMqttV1Transport
extends AstarteMqttTransport {
    private final String m_baseTopic;
    private final BSONDecoder mBSONDecoder = new BasicBSONDecoder();
    private final BSONCallback mBSONCallback = new BasicBSONCallback();
    private final MqttCallback mMqttCallback = new MqttCallbackExtended(){

        public void connectComplete(boolean reconnect, String serverURI) {
            if (reconnect) {
                System.out.println("Reconnected to : " + serverURI);
            } else {
                System.out.println("Connected to: " + serverURI);
            }
        }

        public void connectionLost(Throwable cause) {
            if (AstarteMqttV1Transport.this.m_astarteTransportEventListener != null) {
                AstarteMqttV1Transport.this.m_astarteTransportEventListener.onTransportDisconnected();
            } else {
                System.out.println("The Connection was lost.");
            }
        }

        public void messageArrived(String topic, MqttMessage message) throws AstarteTransportException {
            block22: {
                AstarteInterface realInterface;
                Object astarteValue;
                AstarteInterface targetInterface;
                String interfacePath;
                String astarteInterface;
                block23: {
                    Object payload;
                    DateTime timestamp;
                    block21: {
                        System.out.println("Incoming message: " + new String(message.getPayload()));
                        if (!topic.contains(AstarteMqttV1Transport.this.m_baseTopic) || AstarteMqttV1Transport.this.m_messageListener == null) {
                            return;
                        }
                        String path = topic.replace(AstarteMqttV1Transport.this.m_baseTopic + "/", "");
                        if (path.startsWith("control")) {
                            if (Objects.equals(path, "control/consumer/properties")) {
                                AstarteMqttV1Transport.this.handlePurgeProperties(message.getPayload());
                            } else {
                                System.err.println("Unhandled control message!" + path);
                            }
                            return;
                        }
                        astarteInterface = path.split("/")[0];
                        interfacePath = path.replace(astarteInterface, "");
                        if (!AstarteMqttV1Transport.this.getDevice().hasInterface(astarteInterface)) {
                            System.err.println("Got an unexpected interface! " + astarteInterface);
                            return;
                        }
                        timestamp = null;
                        if (message.getPayload().length == 0) {
                            payload = null;
                        } else {
                            AstarteMqttV1Transport.this.mBSONCallback.reset();
                            AstarteMqttV1Transport.this.mBSONDecoder.decode(message.getPayload(), AstarteMqttV1Transport.this.mBSONCallback);
                            BSONObject astartePayload = (BSONObject)AstarteMqttV1Transport.this.mBSONCallback.get();
                            payload = astartePayload.get("v");
                            if (astartePayload.containsField("t")) {
                                timestamp = new DateTime(astartePayload.get("t"));
                            }
                        }
                        targetInterface = AstarteMqttV1Transport.this.getDevice().getInterface(astarteInterface);
                        if (!(targetInterface instanceof AstarteServerAggregateDatastreamInterface)) break block21;
                        if (payload == null) {
                            return;
                        }
                        BSONObject astartePayload = (BSONObject)payload;
                        HashMap<String, Object> astarteAggregate = new HashMap<String, Object>();
                        for (String key : astartePayload.keySet()) {
                            for (Map.Entry<String, AstarteInterfaceMapping> m : targetInterface.getMappings().entrySet()) {
                                if (!AstarteInterface.isPathCompatibleWithMapping(interfacePath + "/" + key, m.getValue().getPath())) continue;
                                if (m.getValue().getType() == DateTime.class) {
                                    astarteAggregate.put(key, new DateTime(astartePayload.get(key)));
                                    continue;
                                }
                                astarteAggregate.put(key, astartePayload.get(key));
                            }
                        }
                        AstarteServerAggregateDatastreamInterface realInterface2 = (AstarteServerAggregateDatastreamInterface)targetInterface;
                        AstarteAggregateDatastreamEvent e = new AstarteAggregateDatastreamEvent(astarteInterface, astarteAggregate, timestamp);
                        for (AstarteAggregateDatastreamEventListener listener : realInterface2.getAllListeners()) {
                            listener.valueReceived(e);
                        }
                        break block22;
                    }
                    AstarteInterfaceMapping targetMapping = null;
                    for (Map.Entry<String, AstarteInterfaceMapping> entry : targetInterface.getMappings().entrySet()) {
                        if (!AstarteInterface.isPathCompatibleWithMapping(interfacePath, entry.getKey())) continue;
                        targetMapping = entry.getValue();
                        break;
                    }
                    if (targetMapping == null) {
                        System.err.println(String.format("Got an unexpected path %s for interface %s!", interfacePath, targetInterface.getInterfaceName()));
                        return;
                    }
                    astarteValue = payload;
                    if (targetMapping.getType() == DateTime.class) {
                        astarteValue = new DateTime(payload);
                    }
                    if (!(targetInterface instanceof AstarteServerDatastreamInterface)) break block23;
                    realInterface = (AstarteServerDatastreamInterface)targetInterface;
                    AstarteDatastreamEvent e = new AstarteDatastreamEvent(astarteInterface, interfacePath, astarteValue, timestamp);
                    for (AstarteDatastreamEventListener listener : ((AstarteServerDatastreamInterface)targetInterface).getAllListeners()) {
                        listener.valueReceived(e);
                    }
                    break block22;
                }
                if (!(targetInterface instanceof AstarteServerPropertyInterface)) break block22;
                realInterface = (AstarteServerPropertyInterface)targetInterface;
                AstartePropertyEvent e = new AstartePropertyEvent(astarteInterface, interfacePath, astarteValue);
                if (astarteValue == null) {
                    for (AstartePropertyEventListener listener : ((AstarteServerPropertyInterface)targetInterface).getAllListeners()) {
                        listener.propertyUnset(e);
                    }
                } else {
                    for (AstartePropertyEventListener listener : ((AstarteServerPropertyInterface)targetInterface).getAllListeners()) {
                        listener.propertyReceived(e);
                    }
                }
            }
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    };

    public AstarteMqttV1Transport(MutualSSLAuthenticationMqttConnectionInfo connectionInfo) {
        super(AstarteProtocolType.ASTARTE_MQTT_V1, connectionInfo);
        this.m_baseTopic = connectionInfo.getClientId();
        this.setCallback(this.mMqttCallback);
    }

    @Override
    public void setFailedMessageStorage(AstarteFailedMessageStorage failedMessageStorage) {
        super.setFailedMessageStorage(failedMessageStorage);
    }

    @Override
    public void sendIntrospection() throws AstarteTransportException {
        StringBuilder introspectionStringBuilder = new StringBuilder();
        for (AstarteInterface astarteInterface : this.getDevice().getAllInterfaces()) {
            introspectionStringBuilder.append(astarteInterface.getInterfaceName());
            introspectionStringBuilder.append(':');
            introspectionStringBuilder.append(astarteInterface.getMajorVersion());
            introspectionStringBuilder.append(':');
            introspectionStringBuilder.append(astarteInterface.getMinorVersion());
            introspectionStringBuilder.append(';');
        }
        introspectionStringBuilder.deleteCharAt(introspectionStringBuilder.length() - 1);
        String introspection = introspectionStringBuilder.toString();
        MqttMessage introspectionMessage = new MqttMessage();
        introspectionMessage.setPayload(introspection.getBytes());
        introspectionMessage.setRetained(false);
        introspectionMessage.setQos(2);
        try {
            this.m_client.publish(this.m_baseTopic, introspectionMessage);
        }
        catch (MqttException e) {
            throw new AstarteTransportException(e);
        }
    }

    @Override
    public void sendEmptyCache() throws AstarteTransportException {
        MqttMessage emptyCacheMessage = new MqttMessage();
        emptyCacheMessage.setPayload("1".getBytes());
        emptyCacheMessage.setRetained(false);
        emptyCacheMessage.setQos(2);
        try {
            this.m_client.publish(this.m_baseTopic + "/control/emptyCache", emptyCacheMessage);
        }
        catch (MqttException e) {
            throw new AstarteTransportException(e);
        }
    }

    @Override
    public void resendAllProperties() throws AstarteTransportException {
        if (this.m_propertyStorage == null) {
            return;
        }
        for (AstarteInterface astarteInterface : this.getDevice().getAllInterfaces()) {
            if (!(astarteInterface instanceof AstarteDevicePropertyInterface)) continue;
            Map<String, Object> storedPaths = null;
            try {
                storedPaths = this.m_propertyStorage.getStoredValuesForInterface(astarteInterface);
            }
            catch (AstartePropertyStorageException e) {
                throw new AstarteTransportException("Failed to resend properties", e);
            }
            if (storedPaths == null) continue;
            for (Map.Entry<String, Object> entry : storedPaths.entrySet()) {
                this.sendIndividualValue(astarteInterface, entry.getKey(), entry.getValue());
            }
        }
    }

    public void retryFailedMessages() throws AstarteTransportException {
        while (!this.m_failedMessageStorage.isEmpty()) {
            AstarteFailedMessage failedMessage = this.m_failedMessageStorage.peekFirst();
            if (failedMessage.isExpired()) {
                this.m_failedMessageStorage.rejectFirst();
                continue;
            }
            try {
                this.doSendMqttMessage(failedMessage);
            }
            catch (MqttException e) {
                throw new AstarteTransportException(e);
            }
            this.m_failedMessageStorage.ackFirst();
        }
    }

    @Override
    public void sendIndividualValue(AstarteInterface astarteInterface, String path, Object value, DateTime timestamp) throws AstarteTransportException {
        int qos = 2;
        AstarteInterfaceDatastreamMapping mapping = null;
        if (astarteInterface instanceof AstarteDatastreamInterface) {
            try {
                mapping = (AstarteInterfaceDatastreamMapping)AstarteInterface.findMappingInInterface(astarteInterface, path);
            }
            catch (AstarteInterfaceMappingNotFoundException e) {
                throw new AstarteTransportException("Mapping not found", e);
            }
            qos = this.qosFromReliability(mapping);
        }
        String topic = this.m_baseTopic + "/" + astarteInterface.getInterfaceName() + path;
        byte[] payload = this.objectToEncodedBSON(value, timestamp);
        try {
            this.doSendMqttMessage(topic, payload, qos);
        }
        catch (MqttException e) {
            if (astarteInterface instanceof AstarteDatastreamInterface) {
                this.handleDatastreamFailedPublish(e, mapping, topic, payload, qos);
            }
            this.handlePropertiesFailedPublish(e, topic, payload, qos);
        }
    }

    @Override
    public void sendAggregate(AstarteAggregateDatastreamInterface astarteInterface, String path, Map<String, Object> value, DateTime timestamp) throws AstarteTransportException {
        int qos;
        AstarteInterfaceDatastreamMapping mapping;
        try {
            mapping = (AstarteInterfaceDatastreamMapping)astarteInterface.getMappings().values().toArray()[0];
            qos = this.qosFromReliability(mapping);
        }
        catch (Exception e) {
            throw new AstarteTransportException("Mapping not found", e);
        }
        String topic = this.m_baseTopic + "/" + astarteInterface.getInterfaceName() + path;
        byte[] payload = this.objectToEncodedBSON(value, timestamp);
        try {
            this.doSendMqttMessage(topic, payload, qos);
        }
        catch (MqttException e) {
            this.handleDatastreamFailedPublish(e, mapping, topic, payload, qos);
        }
    }

    private void handlePropertiesFailedPublish(MqttException e, String topic, byte[] payload, int qos) throws AstarteTransportException {
        if (!this.isTemporaryException(e)) {
            throw new AstarteTransportException(e);
        }
        this.m_failedMessageStorage.insertStored(topic, payload, qos);
    }

    private void handleDatastreamFailedPublish(MqttException e, AstarteInterfaceDatastreamMapping mapping, String topic, byte[] payload, int qos) throws AstarteTransportException {
        if (!this.isTemporaryException(e)) {
            throw new AstarteTransportException(e);
        }
        int expiry = mapping.getExpiry();
        switch (mapping.getRetention()) {
            case DISCARD: {
                throw new AstarteTransportException("Cannot send value", e);
            }
            case VOLATILE: {
                if (expiry > 0) {
                    this.m_failedMessageStorage.insertVolatile(topic, payload, qos, expiry);
                    break;
                }
                this.m_failedMessageStorage.insertVolatile(topic, payload, qos);
                break;
            }
            case STORED: {
                if (expiry > 0) {
                    this.m_failedMessageStorage.insertStored(topic, payload, qos, expiry);
                    break;
                }
                this.m_failedMessageStorage.insertStored(topic, payload, qos);
            }
        }
    }

    private boolean isTemporaryException(MqttException e) {
        switch (e.getReasonCode()) {
            case 0: 
            case 3: 
            case 32000: 
            case 32002: 
            case 32102: 
            case 32104: 
            case 32109: 
            case 32111: 
            case 32202: {
                return true;
            }
        }
        return false;
    }

    private int qosFromReliability(AstarteInterfaceDatastreamMapping mapping) {
        switch (mapping.getReliability()) {
            case UNIQUE: {
                return 2;
            }
            case GUARANTEED: {
                return 1;
            }
            case UNRELIABLE: {
                return 0;
            }
        }
        return 0;
    }

    private void doSendMqttMessage(AstarteFailedMessage failedMessage) throws MqttException {
        this.doSendMqttMessage(failedMessage.getTopic(), failedMessage.getPayload(), failedMessage.getQos());
    }

    private void doSendMqttMessage(String topic, byte[] payload, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(payload);
        mqttMessage.setRetained(false);
        mqttMessage.setQos(qos);
        this.doSendMqttMessage(topic, mqttMessage);
    }

    private void doSendMqttMessage(String topic, MqttMessage mqttMessage) throws MqttException {
        this.m_client.publish(topic, mqttMessage);
    }

    private byte[] objectToEncodedBSON(Object o, DateTime t) {
        byte[] byArray;
        if (o == null) {
            return new byte[0];
        }
        HashMap<String, Object> bsonJavaObject = new HashMap<String, Object>();
        if (o instanceof DateTime) {
            bsonJavaObject.put("v", ((DateTime)o).toDate());
        } else if (o instanceof Map) {
            Map aggregate = (Map)o;
            for (Map.Entry entry : aggregate.entrySet()) {
                if (!(entry.getValue() instanceof DateTime)) continue;
                entry.setValue(((DateTime)entry.getValue()).toDate());
            }
            bsonJavaObject.put("v", aggregate);
        } else {
            bsonJavaObject.put("v", o);
        }
        if (t != null) {
            bsonJavaObject.put("t", t.toDate());
        }
        Document bsonDocument = new Document(bsonJavaObject);
        BasicOutputBuffer out = new BasicOutputBuffer();
        Object var6_7 = null;
        try (BsonBinaryWriter w = new BsonBinaryWriter((BsonOutput)out);){
            new DocumentCodec().encode((BsonWriter)w, bsonDocument, EncoderContext.builder().build());
            byArray = out.toByteArray();
        }
        return byArray;
    }

    public void onSuccess(IMqttToken asyncActionToken) {
    }

    private void setupSubscriptions() {
        try {
            this.m_client.subscribe(this.m_baseTopic + "/control/consumer/properties", 2);
            for (AstarteInterface astarteInterface : this.getDevice().getAllInterfaces()) {
                if (!(astarteInterface instanceof AstarteServerAggregateDatastreamInterface) && !(astarteInterface instanceof AstarteServerDatastreamInterface) && !(astarteInterface instanceof AstarteServerPropertyInterface)) continue;
                this.m_client.subscribe(this.m_baseTopic + "/" + astarteInterface.getInterfaceName() + "/#", 2);
            }
        }
        catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void handlePurgeProperties(byte[] payload) throws AstarteTransportException {
        byte[] deflated = new byte[payload.length - 4];
        System.arraycopy(payload, 4, deflated, 0, payload.length - 4);
        ByteArrayInputStream bais = new ByteArrayInputStream(deflated);
        InflaterInputStream iis = new InflaterInputStream(bais);
        StringBuilder result = new StringBuilder();
        byte[] buf = new byte[256];
        int rlen = -1;
        try {
            while ((rlen = iis.read(buf)) != -1) {
                result.append(new String(Arrays.copyOf(buf, rlen)));
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        String purgePropertiesPayload = result.toString();
        if (this.m_propertyStorage != null) {
            String[] allProperties;
            HashMap<String, List<String>> availableProperties = new HashMap<String, List<String>>();
            for (AstarteInterface astarteInterface : this.getDevice().getAllInterfaces()) {
                if (!(astarteInterface instanceof AstarteServerPropertyInterface)) continue;
                availableProperties.put(astarteInterface.getInterfaceName(), new ArrayList());
            }
            for (String property : allProperties = purgePropertiesPayload.split(";")) {
                List pathList;
                String[] propertyTokens = property.split("/", 2);
                if (propertyTokens.length != 2 || (pathList = (List)availableProperties.get(propertyTokens[0])) == null || !pathList.add("/" + propertyTokens[1])) continue;
                availableProperties.put(propertyTokens[0], pathList);
            }
            try {
                this.m_propertyStorage.purgeProperties(availableProperties);
            }
            catch (AstartePropertyStorageException astartePropertyStorageException) {
                throw new AstarteTransportException("Failed to purge properties", astartePropertyStorageException);
            }
        }
    }

    @Override
    protected void onConnected(IMqttToken asyncActionToken) throws AstarteTransportException {
        if (!asyncActionToken.getSessionPresent() || !this.m_introspectionSent) {
            this.setupSubscriptions();
            this.sendIntrospection();
            this.sendEmptyCache();
            this.m_introspectionSent = true;
            this.resendAllProperties();
        }
        try {
            this.retryFailedMessages();
        }
        catch (AstarteTransportException e) {
            throw new AstarteTransportException("Message redelivery failed", e);
        }
        if (this.m_astarteTransportEventListener != null) {
            this.m_astarteTransportEventListener.onTransportConnected();
        } else {
            System.out.println("Transport Connected");
        }
    }
}

