/*
 * Decompiled with CFR 0.152.
 */
package org.ektorp.impl.changes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.NullNode;
import org.ektorp.changes.ChangesFeed;
import org.ektorp.changes.DocumentChange;
import org.ektorp.impl.changes.StdDocumentChange;
import org.ektorp.util.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ContinuousChangesFeed
implements ChangesFeed,
Runnable {
    private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousChangesFeed.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final DocumentChange INTERRUPT_MARKER = new StdDocumentChange((JsonNode)NullNode.getInstance());
    private static final Set<Class<?>> INTERRUPTED_EXCEPTION_TYPES = new HashSet();
    private final BlockingQueue<DocumentChange> changes = new LinkedBlockingQueue<DocumentChange>();
    private final BufferedReader reader;
    private final Thread thread = new Thread(this);
    private volatile boolean shouldRun = true;

    public ContinuousChangesFeed(String dbName, InputStream changesStream) {
        try {
            this.reader = new BufferedReader(new InputStreamReader(changesStream, "UTF-8"));
            this.thread.setName(String.format("ektorp-%s-changes-listening-thread-%s", dbName, THREAD_COUNT.getAndIncrement()));
            this.thread.start();
        }
        catch (UnsupportedEncodingException e) {
            throw Exceptions.propagate(e);
        }
    }

    public DocumentChange next() throws InterruptedException {
        this.assertRunningState();
        DocumentChange c = this.changes.take();
        this.checkIfInterrupted(c);
        return c;
    }

    public DocumentChange next(long timeout, TimeUnit unit) throws InterruptedException {
        this.assertRunningState();
        DocumentChange c = this.changes.poll(timeout, unit);
        this.checkIfInterrupted(c);
        return c;
    }

    private void assertRunningState() {
        if (!this.isAlive()) {
            throw new IllegalStateException("Changes feed is not alive");
        }
    }

    private void checkIfInterrupted(DocumentChange c) throws InterruptedException {
        if (c == INTERRUPT_MARKER) {
            throw new InterruptedException();
        }
    }

    public void cancel() {
        LOG.debug("Feed cancelled");
        this.shouldRun = false;
        this.thread.interrupt();
    }

    private void sendInterruptMarker() {
        LOG.debug("Sending interrupt marker in order to interrupt feed consumer");
        this.changes.add(INTERRUPT_MARKER);
    }

    public boolean isAlive() {
        return this.thread.isAlive();
    }

    public int queueSize() {
        return this.changes.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            String line = this.reader.readLine();
            while (this.shouldRun && line != null) {
                if (line.length() > 0) {
                    this.handleChange(line);
                } else {
                    this.handleHeartbeat();
                }
                line = this.reader.readLine();
            }
            String reason = !this.shouldRun ? "Cancelled" : "EOF";
            LOG.info("Changes feed stopped. Reason: " + reason);
        }
        catch (Exception e) {
            this.handleException(e);
        }
        finally {
            this.sendInterruptMarker();
        }
    }

    private void handleChange(String line) throws IOException, JsonParseException, JsonMappingException {
        this.changes.add(new StdDocumentChange(OBJECT_MAPPER.readTree(line)));
    }

    private void handleHeartbeat() {
        LOG.debug("Got heartbeat from DB");
    }

    private void handleException(Exception e) {
        if (INTERRUPTED_EXCEPTION_TYPES.contains(e.getClass())) {
            LOG.info("Changes feed was interrupted");
        } else {
            LOG.error("Caught exception while listening to changes feed:", (Throwable)e);
        }
    }

    static {
        INTERRUPTED_EXCEPTION_TYPES.add(InterruptedException.class);
        INTERRUPTED_EXCEPTION_TYPES.add(InterruptedIOException.class);
    }
}

