/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.zalando.fahrschein.BackoffException;
import org.zalando.fahrschein.BackoffStrategy;
import org.zalando.fahrschein.CursorManager;
import org.zalando.fahrschein.EventAlreadyProcessedException;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.domain.Batch;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.metrics.MetricsCollector;
import org.zalando.fahrschein.metrics.NoMetricsCollector;

public class NakadiReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private final URI uri;
    private final ClientHttpRequestFactory clientHttpRequestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final ObjectMapper objectMapper;
    private final String eventName;
    private final Optional<Subscription> subscription;
    private final Class<T> eventClass;
    private final Listener<T> listener;
    private final JsonFactory jsonFactory;
    private final ObjectReader eventReader;
    private final MetricsCollector metricsCollector;

    public NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, String eventName, Optional<Subscription> subscription, Class<T> eventClass, Listener<T> listener) {
        this(uri, clientHttpRequestFactory, backoffStrategy, cursorManager, objectMapper, eventName, subscription, eventClass, listener, null);
    }

    public NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, String eventName, Optional<Subscription> subscription, Class<T> eventClass, Listener<T> listener, @Nullable MetricsCollector metricsCollector) {
        Preconditions.checkState((!subscription.isPresent() || eventName.equals(Iterables.getOnlyElement(subscription.get().getEventTypes())) ? 1 : 0) != 0, (Object)"Only subscriptions to single event types are currently supported");
        this.uri = uri;
        this.clientHttpRequestFactory = clientHttpRequestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.objectMapper = objectMapper;
        this.eventName = eventName;
        this.subscription = subscription;
        this.eventClass = eventClass;
        this.listener = listener;
        this.metricsCollector = metricsCollector != null ? metricsCollector : new NoMetricsCollector();
        this.jsonFactory = this.objectMapper.getFactory();
        this.eventReader = this.objectMapper.reader().forType(eventClass);
        if (clientHttpRequestFactory instanceof HttpComponentsClientHttpRequestFactory) {
            LOG.warn("Using [{}] might block during reconnection, please consider using another implementation of ClientHttpRequestFactory", (Object)clientHttpRequestFactory.getClass().getName());
        }
    }

    private JsonInput openJsonInput() throws IOException {
        Collection<Cursor> cursors;
        ClientHttpRequest request = this.clientHttpRequestFactory.createRequest(this.uri, HttpMethod.GET);
        if (!this.subscription.isPresent() && !(cursors = this.cursorManager.getCursors(this.eventName)).isEmpty()) {
            String value = this.objectMapper.writeValueAsString(cursors);
            request.getHeaders().put("X-Nakadi-Cursors", Collections.singletonList(value));
        }
        ClientHttpResponse response = request.execute();
        try {
            JsonParser jsonParser = this.jsonFactory.createParser(response.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
            return new JsonInput(response, jsonParser);
        }
        catch (Throwable throwable) {
            try {
                response.close();
            }
            catch (Throwable suppressed) {
                throwable.addSuppressed(suppressed);
            }
            throw throwable;
        }
    }

    private void processBatch(Batch<T> batch) throws IOException {
        Cursor cursor = batch.getCursor();
        try {
            this.listener.accept(batch.getEvents());
            this.cursorManager.onSuccess(this.eventName, cursor);
        }
        catch (EventAlreadyProcessedException e) {
            LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{this.eventName, cursor.getPartition(), cursor.getOffset()});
        }
        catch (Throwable throwable) {
            this.cursorManager.onError(this.eventName, cursor, throwable);
            throw throwable;
        }
    }

    private Cursor readCursor(JsonParser jsonParser) throws IOException {
        String partition = null;
        String offset = null;
        this.expectToken(jsonParser, JsonToken.START_OBJECT);
        block8: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "partition": {
                    partition = jsonParser.nextTextValue();
                    continue block8;
                }
                case "offset": {
                    offset = jsonParser.nextTextValue();
                    continue block8;
                }
            }
            LOG.warn("Unexpected field [{}] in cursor", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (partition == null) {
            throw new IllegalStateException("Could not read partition from cursor");
        }
        if (offset == null) {
            throw new IllegalStateException("Could not read offset from cursor for partition [" + partition + "]");
        }
        return new Cursor(partition, offset);
    }

    private List<T> readEvents(JsonParser jsonParser) throws IOException {
        this.expectToken(jsonParser, JsonToken.START_ARRAY);
        jsonParser.clearCurrentToken();
        Iterator eventIterator = this.eventReader.readValues(jsonParser, this.eventClass);
        ArrayList events = new ArrayList();
        while (true) {
            try {
                while (eventIterator.hasNext()) {
                    events.add(eventIterator.next());
                }
            }
            catch (RuntimeException e) {
                Throwable cause = e.getCause();
                if (cause instanceof JsonMappingException) {
                    this.listener.onMappingException((JsonMappingException)cause);
                    continue;
                }
                if (cause instanceof IOException) {
                    throw (IOException)cause;
                }
                throw e;
            }
            break;
        }
        return events;
    }

    public void run() throws IOException {
        this.run(-1L, TimeUnit.MILLISECONDS);
    }

    public void run(long timeout, TimeUnit timeoutUnit) throws IOException {
        try {
            this.runInternal(timeout, timeoutUnit);
        }
        catch (BackoffException e) {
            throw e.getCause();
        }
    }

    @VisibleForTesting
    void runInternal(long timeout, TimeUnit timeoutUnit) throws IOException, BackoffException {
        long lockedUntil = timeout <= 0L ? Long.MAX_VALUE : System.currentTimeMillis() + timeoutUnit.toMillis(timeout);
        LOG.info("Starting to listen for events for [{}]", (Object)this.eventName);
        JsonInput jsonInput = this.openJsonInput();
        JsonParser jsonParser = jsonInput.getJsonParser();
        int errorCount = 0;
        while (System.currentTimeMillis() < lockedUntil) {
            try {
                LOG.debug("Waiting for next batch of events for [{}]", (Object)this.eventName);
                this.expectToken(jsonParser, JsonToken.START_OBJECT);
                this.expectToken(jsonParser, JsonToken.FIELD_NAME);
                this.expectField(jsonParser, "cursor");
                Cursor cursor = this.readCursor(jsonParser);
                LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", new Object[]{this.eventName, cursor.getPartition(), cursor.getOffset()});
                this.metricsCollector.markMessageReceived();
                JsonToken token = jsonParser.nextToken();
                if (token != JsonToken.END_OBJECT) {
                    this.expectField(jsonParser, "events");
                    List<T> events = this.readEvents(jsonParser);
                    this.metricsCollector.markEventsReceived(events.size());
                    this.expectToken(jsonParser, JsonToken.END_OBJECT);
                    Batch<T> batch = new Batch<T>(cursor, Collections.unmodifiableList(events));
                    this.processBatch(batch);
                    this.metricsCollector.markMessageSuccessfullyProcessed();
                } else {
                    this.metricsCollector.markEventsReceived(0);
                }
                errorCount = 0;
            }
            catch (IOException e) {
                this.metricsCollector.markErrorWhileConsuming();
                if (errorCount > 0) {
                    LOG.warn("Got [{}] while reading events for [{}] after [{}] retries", new Object[]{e.getClass().getSimpleName(), this.eventName, errorCount, e});
                } else {
                    LOG.info("Got [{}] while reading events for [{}]", (Object)e.getClass().getSimpleName(), (Object)this.eventName);
                }
                jsonInput.close();
                if (Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                    break;
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", (Object)errorCount);
                    jsonInput = this.backoffStrategy.call(errorCount, e, this::openJsonInput);
                    jsonParser = jsonInput.getJsonParser();
                    LOG.info("Reconnected after [{}] errors", (Object)errorCount);
                    this.metricsCollector.markReconnection();
                }
                catch (InterruptedException interruptedException) {
                    LOG.warn("Interrupted during reconnection", (Throwable)interruptedException);
                    Thread.currentThread().interrupt();
                    return;
                }
                ++errorCount;
            }
        }
    }

    private void expectField(JsonParser jsonParser, String expectedFieldName) throws IOException {
        String fieldName = jsonParser.getCurrentName();
        Preconditions.checkState((boolean)expectedFieldName.equals(fieldName), (String)"Expected [%s] field but got [%s]", (Object[])new Object[]{expectedFieldName, fieldName});
    }

    private void expectToken(JsonParser jsonParser, JsonToken expectedToken) throws IOException {
        JsonToken token = jsonParser.nextToken();
        if (token == null) {
            throw new EOFException(Thread.currentThread().isInterrupted() ? "Thread was interrupted" : "Stream was closed");
        }
        Preconditions.checkState((token == expectedToken ? 1 : 0) != 0, (String)"Expected [%s] but got [%s]", (Object[])new Object[]{expectedToken, token});
    }

    static class JsonInput
    implements Closeable {
        private final ClientHttpResponse response;
        private final JsonParser jsonParser;

        JsonInput(ClientHttpResponse response, JsonParser jsonParser) {
            this.response = response;
            this.jsonParser = jsonParser;
        }

        ClientHttpResponse getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() {
            return this.jsonParser;
        }

        @Override
        public void close() {
            try {
                LOG.trace("Trying to close json parser");
                this.jsonParser.close();
                LOG.trace("Closed json parser");
            }
            catch (IOException e) {
                LOG.warn("Could not close json parser", (Throwable)e);
            }
            finally {
                LOG.trace("Trying to close response");
                this.response.close();
                LOG.trace("Closed response");
            }
        }
    }
}

