package io.relayr.java.websocket;

import com.google.gson.Gson;
import io.relayr.java.api.ChannelApi;
import io.relayr.java.model.DataPackage;
import io.relayr.java.model.Device;
import io.relayr.java.model.action.Command;
import io.relayr.java.model.action.Configuration;
import io.relayr.java.model.action.Reading;
import io.relayr.java.model.channel.ChannelDefinition;
import io.relayr.java.model.channel.ChannelTransport;
import io.relayr.java.model.channel.DataChannel;
import io.relayr.java.model.channel.PublishChannel;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.paho.client.mqttv3.MqttException;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

@Singleton
/* loaded from: input_file:io/relayr/java/websocket/WebSocketClient.class */
public class WebSocketClient {
    private final ChannelApi channelApi;
    private final WebSocket<DataChannel> webSocket;
    ReplaySubject<DataChannel> pubChannel;
    private final Map<String, DataChannel> subChannels = new HashMap();
    final Map<String, PublishSubject<Reading>> subObservers = new HashMap();
    final Map<String, DataChannel> pubChannels = new HashMap();
    final Map<String, AsyncSubject<Void>> pubObservers = new HashMap();
    final Map<String, Subject> actionObservers = new HashMap();

    @Inject
    public WebSocketClient(ChannelApi channelApi, WebSocketFactory webSocketFactory) {
        this.channelApi = channelApi;
        this.webSocket = webSocketFactory.createWebSocket();
    }

    public Observable<Reading> subscribe(Device device) {
        return (device == null || device.getId() == null) ? Observable.empty() : subscribe(device.getId());
    }

    public Observable<Reading> subscribe(String str) {
        return str == null ? Observable.empty() : !this.subObservers.containsKey(str) ? createSubObserver(str) : this.subObservers.get(str);
    }

    private synchronized Observable<Reading> createSubObserver(final String str) {
        final PublishSubject<Reading> create = PublishSubject.create();
        this.subObservers.put(str, create);
        this.channelApi.create(new ChannelDefinition(str, ChannelTransport.MQTT)).flatMap(new Func1<DataChannel, Observable<DataChannel>>() { // from class: io.relayr.java.websocket.WebSocketClient.2
            public Observable<DataChannel> call(DataChannel dataChannel) {
                return WebSocketClient.this.webSocket.createClient(dataChannel);
            }
        }).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<DataChannel>() { // from class: io.relayr.java.websocket.WebSocketClient.1
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                WebSocketClient.this.subObservers.remove(str);
            }

            public void onNext(DataChannel dataChannel) {
                WebSocketClient.this.subscribeToChannel(dataChannel, str, create);
            }
        });
        return create.doOnError(new Action1<Throwable>() { // from class: io.relayr.java.websocket.WebSocketClient.3
            public void call(Throwable th) {
                WebSocketClient.this.unSubscribe(str);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void subscribeToChannel(final DataChannel dataChannel, final String str, final PublishSubject<Reading> publishSubject) {
        this.webSocket.subscribe(dataChannel.getTopic(), dataChannel.getId(), new WebSocketCallback() { // from class: io.relayr.java.websocket.WebSocketClient.4
            @Override // io.relayr.java.websocket.WebSocketCallback
            public void connectCallback(Object obj) {
                if (WebSocketClient.this.subChannels.containsKey(str)) {
                    return;
                }
                WebSocketClient.this.subChannels.put(str, dataChannel);
            }

            @Override // io.relayr.java.websocket.WebSocketCallback
            public void disconnectCallback(Object obj) {
                publishSubject.onError((Throwable) obj);
                WebSocketClient.this.subChannels.remove(str);
                WebSocketClient.this.subObservers.remove(str);
            }

            @Override // io.relayr.java.websocket.WebSocketCallback
            public void successCallback(Object obj) {
                DataPackage dataPackage = (DataPackage) new Gson().fromJson(obj.toString(), DataPackage.class);
                for (DataPackage.Data data : dataPackage.readings) {
                    publishSubject.onNext(new Reading(dataPackage.received, data.recorded, data.meaning, data.path, data.value));
                }
            }

            @Override // io.relayr.java.websocket.WebSocketCallback
            public void errorCallback(Throwable th) {
                publishSubject.onError(th);
                WebSocketClient.this.subChannels.remove(str);
                WebSocketClient.this.subObservers.remove(str);
            }
        });
    }

    public void unSubscribe(String str) {
        if (this.subObservers.containsKey(str)) {
            this.subObservers.get(str).onCompleted();
            this.subObservers.remove(str);
        }
        if (!this.subChannels.isEmpty() && this.subChannels.containsKey(str) && this.webSocket.unSubscribe(this.subChannels.get(str).getCredentials().getTopic())) {
            this.subChannels.remove(str);
        }
    }

    public synchronized Observable<Void> publish(final String str, final Reading reading) {
        AsyncSubject<Void> asyncSubject = this.pubObservers.get(str);
        if (asyncSubject != null) {
            publishData(str, reading, asyncSubject);
            return asyncSubject;
        }
        final AsyncSubject create = AsyncSubject.create();
        createPubChannel(str).subscribeOn(Schedulers.newThread()).subscribe(new Observer<DataChannel>() { // from class: io.relayr.java.websocket.WebSocketClient.5
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                WebSocketClient.this.pubChannels.remove(str);
                WebSocketClient.this.pubObservers.remove(str);
                create.onError(th);
            }

            public void onNext(DataChannel dataChannel) {
                if (!WebSocketClient.this.pubObservers.containsKey(str)) {
                    WebSocketClient.this.pubObservers.put(str, create);
                }
                if (!WebSocketClient.this.pubChannels.containsKey(str)) {
                    WebSocketClient.this.pubChannels.put(str, dataChannel);
                }
                WebSocketClient.this.publishData(str, reading, create);
            }
        });
        return clearIfError(create, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishData(String str, Reading reading, AsyncSubject<Void> asyncSubject) {
        try {
            this.webSocket.publish(this.pubChannels.get(str).getCredentials().getTopic() + "data", new Gson().toJson(reading));
            asyncSubject.onNext((Object) null);
            asyncSubject.onCompleted();
        } catch (MqttException e) {
            this.pubChannels.remove(str);
            this.pubObservers.remove(str);
            asyncSubject.onError(e);
        }
    }

    public Observable<Command> subscribeToCommands(String str) {
        return str == null ? Observable.empty() : !this.actionObservers.containsKey(str) ? createActionObserver(str, PublishSubject.create(), Command.class) : this.actionObservers.get(str);
    }

    public Observable<Configuration> subscribeToConfigurations(String str) {
        return str == null ? Observable.empty() : !this.actionObservers.containsKey(str) ? createActionObserver(str, PublishSubject.create(), Configuration.class) : this.actionObservers.get(str);
    }

    synchronized <T> Observable<T> createActionObserver(final String str, final PublishSubject<T> publishSubject, final Class<T> cls) {
        this.actionObservers.put(str, publishSubject);
        createPubChannel(str).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<DataChannel>() { // from class: io.relayr.java.websocket.WebSocketClient.6
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                th.printStackTrace();
                WebSocketClient.this.pubChannels.remove(str);
                WebSocketClient.this.actionObservers.remove(str);
            }

            public void onNext(DataChannel dataChannel) {
                if (!WebSocketClient.this.pubChannels.containsKey(str)) {
                    WebSocketClient.this.pubChannels.put(str, dataChannel);
                }
                WebSocketClient.this.webSocket.subscribe(dataChannel.getTopic() + (cls == Command.class ? "cmd" : "conf"), dataChannel.getId(), new WebSocketCallback() { // from class: io.relayr.java.websocket.WebSocketClient.6.1
                    @Override // io.relayr.java.websocket.WebSocketCallback
                    public void connectCallback(Object obj) {
                    }

                    @Override // io.relayr.java.websocket.WebSocketCallback
                    public void disconnectCallback(Object obj) {
                        publishSubject.onError((Throwable) obj);
                        WebSocketClient.this.actionObservers.remove(str);
                    }

                    @Override // io.relayr.java.websocket.WebSocketCallback
                    public void successCallback(Object obj) {
                        publishSubject.onNext(new Gson().fromJson(obj.toString(), cls));
                    }

                    @Override // io.relayr.java.websocket.WebSocketCallback
                    public void errorCallback(Throwable th) {
                        publishSubject.onError(th);
                        WebSocketClient.this.actionObservers.remove(str);
                    }
                });
            }
        });
        return clearIfError(publishSubject, str);
    }

    synchronized Observable<DataChannel> createPubChannel(String str) {
        if (this.pubChannel != null) {
            return this.pubChannel;
        }
        this.pubChannel = ReplaySubject.create();
        this.channelApi.createForDevice(new ChannelDefinition(str, ChannelTransport.MQTT), str).subscribeOn(Schedulers.newThread()).flatMap(new Func1<PublishChannel, Observable<DataChannel>>() { // from class: io.relayr.java.websocket.WebSocketClient.9
            public Observable<DataChannel> call(PublishChannel publishChannel) {
                return WebSocketClient.this.webSocket.createClient(publishChannel);
            }
        }).subscribe(new Action1<DataChannel>() { // from class: io.relayr.java.websocket.WebSocketClient.7
            public void call(DataChannel dataChannel) {
                WebSocketClient.this.pubChannel.onNext(dataChannel);
            }
        }, new Action1<Throwable>() { // from class: io.relayr.java.websocket.WebSocketClient.8
            public void call(Throwable th) {
                WebSocketClient.this.pubChannel.onError(th);
            }
        });
        return this.pubChannel;
    }

    private <T> Observable<T> clearIfError(Subject<T, T> subject, final String str) {
        return subject.doOnError(new Action1<Throwable>() { // from class: io.relayr.java.websocket.WebSocketClient.10
            public void call(Throwable th) {
                WebSocketClient.this.pubChannels.remove(str);
                WebSocketClient.this.pubObservers.remove(str);
            }
        });
    }
}
