/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.queue.nats;

import com.netflix.conductor.contribs.queue.nats.JsmMessage;
import com.netflix.conductor.contribs.queue.nats.LoggingNatsErrorListener;
import com.netflix.conductor.contribs.queue.nats.NatsException;
import com.netflix.conductor.contribs.queue.nats.config.JetStreamProperties;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.ErrorListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.availability.AvailabilityChangeEvent;
import org.springframework.boot.availability.AvailabilityState;
import org.springframework.boot.availability.LivenessState;
import org.springframework.context.ApplicationEventPublisher;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;

public class JetStreamObservableQueue
implements ObservableQueue {
    private static final Logger LOG = LoggerFactory.getLogger(JetStreamObservableQueue.class);
    private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue();
    private final Lock mu = new ReentrantLock();
    private final String queueType;
    private final String subject;
    private final String queueUri;
    private final JetStreamProperties properties;
    private final Scheduler scheduler;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ApplicationEventPublisher eventPublisher;
    private Connection nc;
    private JetStreamSubscription sub;
    private Observable<Long> interval;
    private final String queueGroup;

    public JetStreamObservableQueue(ConductorProperties conductorProperties, JetStreamProperties properties, String queueType, String queueUri, Scheduler scheduler, ApplicationEventPublisher eventPublisher) {
        LOG.debug("JSM obs queue create, qtype={}, quri={}", (Object)queueType, (Object)queueUri);
        this.queueUri = queueUri;
        if (queueUri.contains(":")) {
            this.subject = JetStreamObservableQueue.getQueuePrefix(conductorProperties, properties) + queueUri.substring(0, queueUri.indexOf(58));
            this.queueGroup = queueUri.substring(queueUri.indexOf(58) + 1);
        } else {
            this.subject = JetStreamObservableQueue.getQueuePrefix(conductorProperties, properties) + queueUri;
            this.queueGroup = null;
        }
        this.queueType = queueType;
        this.properties = properties;
        this.scheduler = scheduler;
        this.eventPublisher = eventPublisher;
    }

    public static String getQueuePrefix(ConductorProperties conductorProperties, JetStreamProperties properties) {
        Object stack = "";
        if (conductorProperties.getStack() != null && conductorProperties.getStack().length() > 0) {
            stack = conductorProperties.getStack() + "_";
        }
        return StringUtils.isBlank((CharSequence)properties.getListenerQueuePrefix()) ? conductorProperties.getAppId() + "_jsm_notify_" + (String)stack : properties.getListenerQueuePrefix();
    }

    public Observable<Message> observe() {
        return Observable.create(this.getOnSubscribe());
    }

    private Observable.OnSubscribe<Message> getOnSubscribe() {
        return subscriber -> {
            this.interval = Observable.interval((long)this.properties.getPollTimeDuration().toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (Scheduler)this.scheduler);
            this.interval.flatMap(x -> {
                if (!this.isRunning()) {
                    LOG.debug("Component stopped, skip listening for messages from JSM Queue '{}'", (Object)this.subject);
                    return Observable.from(Collections.emptyList());
                }
                ArrayList available = new ArrayList();
                this.messages.drainTo(available);
                if (!available.isEmpty()) {
                    LOG.debug("Processing JSM queue '{}' batch messages count={}", (Object)this.subject, (Object)available.size());
                }
                return Observable.from(available);
            }).subscribe(arg_0 -> ((Subscriber)subscriber).onNext(arg_0), arg_0 -> ((Subscriber)subscriber).onError(arg_0));
        };
    }

    public String getType() {
        return this.queueType;
    }

    public String getName() {
        return this.queueUri;
    }

    public String getURI() {
        return this.getName();
    }

    public List<String> ack(List<Message> messages) {
        messages.forEach(m -> ((JsmMessage)((Object)m)).getJsmMsg().ack());
        return Collections.emptyList();
    }

    public void publish(List<Message> messages) {
        try (Connection conn = Nats.connect((String)this.properties.getUrl());){
            JetStream js = conn.jetStream();
            for (Message msg : messages) {
                js.publish(this.subject, msg.getPayload().getBytes());
            }
        }
        catch (JetStreamApiException | IOException e) {
            throw new NatsException("Failed to publish to jsm", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to publish to jsm", e);
        }
    }

    public void setUnackTimeout(Message message, long unackTimeout) {
    }

    public long size() {
        try {
            return this.sub.getConsumerInfo().getNumPending();
        }
        catch (JetStreamApiException | IOException e) {
            LOG.warn("Failed to get stream '{}' info", (Object)this.subject);
            return 0L;
        }
    }

    public void start() {
        this.mu.lock();
        try {
            this.natsConnect();
        }
        finally {
            this.mu.unlock();
        }
    }

    public void stop() {
        this.interval.unsubscribeOn(this.scheduler);
        try {
            if (this.nc != null) {
                this.nc.close();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Failed to close Nats connection", (Throwable)e);
        }
        this.running.set(false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void natsConnect() {
        if (this.running.get()) {
            return;
        }
        LOG.info("Starting JSM observable, name={}", (Object)this.queueUri);
        try {
            Nats.connectAsynchronously((Options)new Options.Builder().connectionListener((conn, type) -> {
                LOG.info("Connection to JSM updated: {}", (Object)type);
                if (ConnectionListener.Events.CLOSED.equals((Object)type)) {
                    LOG.error("Could not reconnect to NATS! Changing liveness status to {}!", (Object)LivenessState.BROKEN);
                    AvailabilityChangeEvent.publish((ApplicationEventPublisher)this.eventPublisher, (Object)type, (AvailabilityState)LivenessState.BROKEN);
                }
                this.nc = conn;
                this.subscribeOnce(conn, type);
            }).errorListener((ErrorListener)new LoggingNatsErrorListener()).server(this.properties.getUrl()).maxReconnects(this.properties.getMaxReconnects()).build(), (boolean)true);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NatsException("Failed to connect to JSM", e);
        }
    }

    private void createStream(JetStreamManagement jsm) {
        StreamConfiguration streamConfig = StreamConfiguration.builder().name(this.subject).replicas(this.properties.getReplicas()).retentionPolicy(RetentionPolicy.Limits).maxBytes(this.properties.getStreamMaxBytes()).storageType(StorageType.get((String)this.properties.getStreamStorageType())).build();
        try {
            StreamInfo streamInfo = jsm.addStream(streamConfig);
            LOG.debug("Updated stream, info: {}", (Object)streamInfo);
        }
        catch (JetStreamApiException | IOException e) {
            LOG.error("Failed to add stream: " + String.valueOf(streamConfig), e);
            AvailabilityChangeEvent.publish((ApplicationEventPublisher)this.eventPublisher, (Object)e, (AvailabilityState)LivenessState.BROKEN);
        }
    }

    private void subscribeOnce(Connection nc, ConnectionListener.Events type) {
        if (type.equals((Object)ConnectionListener.Events.CONNECTED) || type.equals((Object)ConnectionListener.Events.RECONNECTED)) {
            JetStreamManagement jsm;
            try {
                jsm = nc.jetStreamManagement();
            }
            catch (IOException e) {
                throw new NatsException("Failed to get jsm management", e);
            }
            this.createStream(jsm);
            ConsumerConfiguration consumerConfig = this.createConsumer(jsm);
            this.subscribe(nc, consumerConfig);
        }
    }

    private ConsumerConfiguration createConsumer(JetStreamManagement jsm) {
        ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder().name(this.properties.getDurableName()).deliverGroup(this.queueGroup).durable(this.properties.getDurableName()).ackWait(this.properties.getAckWait()).maxDeliver((long)this.properties.getMaxDeliver()).maxAckPending(this.properties.getMaxAckPending()).ackPolicy(AckPolicy.Explicit).deliverSubject(this.subject + "-deliver").deliverPolicy(DeliverPolicy.New).build();
        try {
            jsm.addOrUpdateConsumer(this.subject, consumerConfig);
            return consumerConfig;
        }
        catch (JetStreamApiException | IOException e) {
            throw new NatsException("Failed to add/update consumer", e);
        }
    }

    private void subscribe(Connection nc, ConsumerConfiguration consumerConfig) {
        try {
            JetStream js = nc.jetStream();
            PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().configuration(consumerConfig)).stream(this.subject)).bind(true)).build();
            LOG.debug("Subscribing jsm, subject={}, options={}", (Object)this.subject, (Object)pso);
            this.sub = js.subscribe(this.subject, this.queueGroup, nc.createDispatcher(), msg -> {
                JsmMessage message = new JsmMessage();
                message.setJsmMsg(msg);
                message.setId(NUID.nextGlobal());
                message.setPayload(new String(msg.getData()));
                this.messages.add(message);
            }, false, pso);
            LOG.debug("Subscribed successfully {}", (Object)this.sub.getConsumerInfo());
            this.running.set(true);
        }
        catch (JetStreamApiException | IOException e) {
            throw new NatsException("Failed to subscribe", e);
        }
    }
}

