/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.NUID;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;

public abstract class NATSAbstractQueue
implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(NATSAbstractQueue.class);
    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue();
    protected final Lock mu = new ReentrantLock();
    private final String queueType;
    private ScheduledExecutorService execs;
    private final Scheduler scheduler;
    protected final String queueURI;
    protected final String subject;
    protected String queue;
    private boolean observable;
    private boolean isOpened;
    private volatile boolean running;

    NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) {
        this.queueURI = queueURI;
        this.queueType = queueType;
        this.scheduler = scheduler;
        if (queueURI.contains(":")) {
            this.subject = queueURI.substring(0, queueURI.indexOf(58));
            this.queue = queueURI.substring(queueURI.indexOf(58) + 1);
        } else {
            this.subject = queueURI;
            this.queue = null;
        }
        LOGGER.info(String.format("Initialized with queueURI=%s, subject=%s, queue=%s", queueURI, this.subject, this.queue));
    }

    void onMessage(String subject, byte[] data) {
        String payload = new String(data);
        LOGGER.info(String.format("Received message for %s: %s", subject, payload));
        Message dstMsg = new Message();
        dstMsg.setId(NUID.nextGlobal());
        dstMsg.setPayload(payload);
        this.messages.add(dstMsg);
    }

    public Observable<Message> observe() {
        LOGGER.info("Observe invoked for queueURI " + this.queueURI);
        this.observable = true;
        this.mu.lock();
        try {
            this.subscribe();
        }
        finally {
            this.mu.unlock();
        }
        Observable.OnSubscribe onSubscribe = subscriber -> {
            Observable interval = Observable.interval((long)100L, (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)this.scheduler);
            interval.flatMap(x -> {
                if (!this.isRunning()) {
                    LOGGER.debug("Component stopped, skip listening for messages from NATS Queue");
                    return Observable.from(Collections.emptyList());
                }
                LinkedList available = new LinkedList();
                this.messages.drainTo(available);
                if (!available.isEmpty()) {
                    AtomicInteger count = new AtomicInteger(0);
                    StringBuilder buffer = new StringBuilder();
                    available.forEach(msg -> {
                        buffer.append(msg.getId()).append("=").append(msg.getPayload());
                        count.incrementAndGet();
                        if (count.get() < available.size()) {
                            buffer.append(",");
                        }
                    });
                    LOGGER.info(String.format("Batch from %s to conductor is %s", this.subject, buffer.toString()));
                }
                return Observable.from(available);
            }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0));
        };
        return Observable.create((Observable.OnSubscribe)onSubscribe);
    }

    public String getType() {
        return this.queueType;
    }

    public String getName() {
        return this.queueURI;
    }

    public String getURI() {
        return this.queueURI;
    }

    public List<String> ack(List<Message> messages) {
        return Collections.emptyList();
    }

    public void setUnackTimeout(Message message, long unackTimeout) {
    }

    public long size() {
        return this.messages.size();
    }

    public void publish(List<Message> messages) {
        messages.forEach(message -> {
            try {
                String payload = message.getPayload();
                this.publish(this.subject, payload.getBytes());
                LOGGER.info(String.format("Published message to %s: %s", this.subject, payload));
            }
            catch (Exception ex) {
                LOGGER.error("Failed to publish message " + message.getPayload() + " to " + this.subject, (Throwable)ex);
                throw new RuntimeException(ex);
            }
        });
    }

    public boolean rePublishIfNoAck() {
        return true;
    }

    public void close() {
        LOGGER.info("Closing connection for " + this.queueURI);
        this.mu.lock();
        try {
            if (this.execs != null) {
                this.execs.shutdownNow();
                this.execs = null;
            }
            this.closeSubs();
            this.closeConn();
            this.isOpened = false;
        }
        finally {
            this.mu.unlock();
        }
    }

    public void open() {
        if (this.isOpened) {
            return;
        }
        this.mu.lock();
        try {
            try {
                this.connect();
                if (this.observable) {
                    this.subscribe();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.execs = Executors.newScheduledThreadPool(1);
            this.execs.scheduleAtFixedRate(this::monitor, 0L, 500L, TimeUnit.MILLISECONDS);
            this.isOpened = true;
        }
        finally {
            this.mu.unlock();
        }
    }

    private void monitor() {
        if (this.isConnected()) {
            return;
        }
        LOGGER.error("Monitor invoked for " + this.queueURI);
        this.mu.lock();
        try {
            this.closeSubs();
            this.closeConn();
            this.connect();
            if (this.observable) {
                this.subscribe();
            }
        }
        catch (Exception ex) {
            LOGGER.error("Monitor failed with " + ex.getMessage() + " for " + this.queueURI, (Throwable)ex);
        }
        finally {
            this.mu.unlock();
        }
    }

    public boolean isClosed() {
        return !this.isOpened;
    }

    void ensureConnected() {
        if (!this.isConnected()) {
            throw new RuntimeException("No nats connection");
        }
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.queueURI);
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", (Object)this.getClass().getSimpleName(), (Object)this.queueURI);
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    abstract void connect();

    abstract boolean isConnected();

    abstract void publish(String var1, byte[] var2) throws Exception;

    abstract void subscribe();

    abstract void closeSubs();

    abstract void closeConn();
}

