/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.contribs.queue.nats.NATSAbstractQueue;
import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Subscription;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;

public class NATSObservableQueue
extends NATSAbstractQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(NATSObservableQueue.class);
    private Subscription subs;
    private Connection conn;

    public NATSObservableQueue(String queueURI, Scheduler scheduler) {
        super(queueURI, "nats", scheduler);
        this.open();
    }

    @Override
    public boolean isConnected() {
        return this.conn != null && Connection.Status.CONNECTED.equals((Object)this.conn.getStatus());
    }

    @Override
    public void connect() {
        try {
            Connection temp = Nats.connect();
            LOGGER.info("Successfully connected for " + this.queueURI);
            this.conn = temp;
        }
        catch (Exception e) {
            LOGGER.error("Unable to establish nats connection for " + this.queueURI, (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void subscribe() {
        if (this.subs != null) {
            return;
        }
        try {
            this.ensureConnected();
            if (StringUtils.isNotEmpty((CharSequence)this.queue)) {
                LOGGER.info("No subscription. Creating a queue subscription. subject={}, queue={}", (Object)this.subject, (Object)this.queue);
                this.conn.createDispatcher(msg -> this.onMessage(msg.getSubject(), msg.getData()));
                this.subs = this.conn.subscribe(this.subject, this.queue);
            } else {
                LOGGER.info("No subscription. Creating a pub/sub subscription. subject={}", (Object)this.subject);
                this.conn.createDispatcher(msg -> this.onMessage(msg.getSubject(), msg.getData()));
                this.subs = this.conn.subscribe(this.subject);
            }
        }
        catch (Exception ex) {
            LOGGER.error("Subscription failed with " + ex.getMessage() + " for queueURI " + this.queueURI, (Throwable)ex);
        }
    }

    @Override
    public void publish(String subject, byte[] data) throws Exception {
        this.ensureConnected();
        this.conn.publish(subject, data);
    }

    @Override
    public void closeSubs() {
        if (this.subs != null) {
            try {
                this.subs.unsubscribe();
            }
            catch (Exception ex) {
                LOGGER.error("closeSubs failed with " + ex.getMessage() + " for " + this.queueURI, (Throwable)ex);
            }
            this.subs = null;
        }
    }

    @Override
    public void closeConn() {
        if (this.conn != null) {
            try {
                this.conn.close();
            }
            catch (Exception ex) {
                LOGGER.error("closeConn failed with " + ex.getMessage() + " for " + this.queueURI, (Throwable)ex);
            }
            this.conn = null;
        }
    }
}

