/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.zalando.fahrschein.CursorManager;
import org.zalando.fahrschein.ExponentialBackoffException;
import org.zalando.fahrschein.ExponentialBackoffStrategy;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.domain.Batch;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Subscription;

public class NakadiReader<T>
implements IORunnable {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private final URI uri;
    private final ClientHttpRequestFactory clientHttpRequestFactory;
    private final ExponentialBackoffStrategy exponentialBackoffStrategy;
    private final CursorManager cursorManager;
    private final ObjectMapper objectMapper;
    private final String eventName;
    private final Optional<Subscription> subscription;
    private final Class<T> eventClass;
    private final Listener<T> listener;

    public NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, ExponentialBackoffStrategy exponentialBackoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, String eventName, Optional<Subscription> subscription, Class<T> eventClass, Listener<T> listener) {
        this.uri = uri;
        this.clientHttpRequestFactory = clientHttpRequestFactory;
        this.exponentialBackoffStrategy = exponentialBackoffStrategy;
        this.cursorManager = cursorManager;
        this.objectMapper = objectMapper;
        this.eventName = eventName;
        this.subscription = subscription;
        this.eventClass = eventClass;
        this.listener = listener;
        Preconditions.checkState((!subscription.isPresent() || eventName.equals(Iterables.getOnlyElement(subscription.get().getEventTypes())) ? 1 : 0) != 0);
    }

    private ClientHttpResponse openStream(int errorCount) throws InterruptedException, IOException {
        try {
            return this.exponentialBackoffStrategy.call(errorCount, this::openStream);
        }
        catch (ExponentialBackoffException e) {
            throw e.getCause();
        }
    }

    private ClientHttpResponse openStream() throws IOException {
        Collection<Cursor> cursors;
        ClientHttpRequest request = this.clientHttpRequestFactory.createRequest(this.uri, HttpMethod.GET);
        if (!this.subscription.isPresent() && !(cursors = this.cursorManager.getCursors(this.eventName)).isEmpty()) {
            String value = this.objectMapper.writeValueAsString(cursors);
            request.getHeaders().put("X-Nakadi-Cursors", Collections.singletonList(value));
        }
        return request.execute();
    }

    private void processBatch(Batch<T> batch) throws IOException {
        Cursor cursor = batch.getCursor();
        try {
            this.listener.onEvent(batch.getEvents());
            this.cursorManager.onSuccess(this.eventName, cursor);
        }
        catch (Throwable throwable) {
            this.cursorManager.onError(this.eventName, cursor, throwable);
            throw throwable;
        }
    }

    private Cursor readCursor(JsonParser jsonParser) throws IOException {
        String partition = null;
        String offset = null;
        JsonToken token = jsonParser.nextToken();
        Preconditions.checkState((token == JsonToken.START_OBJECT ? 1 : 0) != 0);
        block8: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "partition": {
                    partition = jsonParser.nextTextValue();
                    continue block8;
                }
                case "offset": {
                    offset = jsonParser.nextTextValue();
                    continue block8;
                }
            }
            LOG.warn("Unexpected field [{}] in cursor", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (partition == null) {
            throw new IllegalStateException("Could not read partition from cursor");
        }
        if (offset == null) {
            throw new IllegalStateException("Could not read offset from cursor for partition [" + partition + "]");
        }
        return new Cursor(partition, offset);
    }

    private List<T> readEvents(ObjectReader objectReader, JsonParser jsonParser) throws IOException {
        JsonToken token = jsonParser.nextToken();
        Preconditions.checkState((token == JsonToken.START_ARRAY ? 1 : 0) != 0);
        ArrayList<Object> events = new ArrayList<Object>();
        while (jsonParser.nextToken() == JsonToken.START_OBJECT) {
            Object event = objectReader.readValue(jsonParser, this.eventClass);
            events.add(event);
        }
        return events;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() throws IOException {
        JsonFactory jsonFactory = this.objectMapper.copy().getFactory().configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
        ObjectReader objectReader = this.objectMapper.reader().forType(this.eventClass);
        ClientHttpResponse response = this.openStream();
        JsonParser jsonParser = jsonFactory.createParser(response.getBody());
        int errorCount = 0;
        while (true) {
            try {
                while (true) {
                    JsonToken token;
                    if ((token = jsonParser.nextToken()) == null) {
                        throw new IOException("Stream was closed");
                    }
                    Preconditions.checkState((token == JsonToken.START_OBJECT ? 1 : 0) != 0, (String)"Expected [%s] but got [%s]", (Object[])new Object[]{JsonToken.START_OBJECT, token});
                    String field = jsonParser.nextFieldName();
                    Preconditions.checkState((boolean)"cursor".equals(field), (String)"Expected [cursor] field but got [%s]", (Object[])new Object[]{field});
                    Cursor cursor = this.readCursor(jsonParser);
                    LOG.debug("Cursor for partition [{}] at offset [{}]", (Object)cursor.getPartition(), (Object)cursor.getOffset());
                    token = jsonParser.nextToken();
                    if (token != JsonToken.END_OBJECT) {
                        field = jsonParser.getCurrentName();
                        Preconditions.checkState((boolean)"events".equals(field), (String)"Expected [event] field but got [%s]", (Object[])new Object[]{field});
                        List<T> events = this.readEvents(objectReader, jsonParser);
                        token = jsonParser.nextToken();
                        Preconditions.checkState((token == JsonToken.END_OBJECT ? 1 : 0) != 0, (String)"Expected [%s] but got [%s]", (Object[])new Object[]{JsonToken.END_OBJECT, token});
                        Batch<T> batch = new Batch<T>(cursor, Collections.unmodifiableList(events));
                        this.processBatch(batch);
                    }
                    errorCount = 0;
                }
            }
            catch (IOException e) {
                LOG.warn("Got [{}] while reading events", (Object)e.getClass().getSimpleName(), (Object)e);
                try {
                    LOG.debug("Trying to close json parser");
                    jsonParser.close();
                }
                catch (IOException e1) {
                    LOG.warn("Could not close json parser on IOException");
                }
                finally {
                    LOG.debug("Trying to close response");
                    response.close();
                }
                try {
                    LOG.info("Reconnecting after [{}] errors", (Object)errorCount);
                    response = this.openStream(errorCount);
                    jsonParser = jsonFactory.createParser(response.getBody());
                }
                catch (InterruptedException e1) {
                    LOG.warn("Interrupted during reconnection");
                    Thread.currentThread().interrupt();
                    return;
                }
                ++errorCount;
                continue;
            }
            break;
        }
    }
}

