/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.zalando.fahrschein.BackoffException;
import org.zalando.fahrschein.BackoffStrategy;
import org.zalando.fahrschein.CursorManager;
import org.zalando.fahrschein.DefaultObjectMapper;
import org.zalando.fahrschein.ErrorHandler;
import org.zalando.fahrschein.EventAlreadyProcessedException;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.MetricsCollector;
import org.zalando.fahrschein.Preconditions;
import org.zalando.fahrschein.domain.Batch;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.domain.Partition;
import org.zalando.fahrschein.domain.Subscription;

class NakadiReader<T>
implements IORunnable {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private static final TypeReference<Collection<Cursor>> COLLECTION_OF_CURSORS = new TypeReference<Collection<Cursor>>(){};
    private final URI uri;
    private final ClientHttpRequestFactory clientHttpRequestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final Set<String> eventNames;
    private final Optional<Subscription> subscription;
    private final Optional<Lock> lock;
    private final Class<T> eventClass;
    private final Listener<T> listener;
    private final ErrorHandler errorHandler;
    private final JsonFactory jsonFactory;
    private final ObjectReader eventReader;
    private final ObjectWriter cursorHeaderWriter;
    private final MetricsCollector metricsCollector;

    NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, Set<String> eventNames, Optional<Subscription> subscription, Optional<Lock> lock, Class<T> eventClass, Listener<T> listener, ErrorHandler errorHandler, MetricsCollector metricsCollector) {
        Preconditions.checkState(subscription.isPresent() || eventNames.size() == 1, "Low level api only supports reading from a single event");
        this.uri = uri;
        this.clientHttpRequestFactory = clientHttpRequestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.eventNames = eventNames;
        this.subscription = subscription;
        this.lock = lock;
        this.eventClass = eventClass;
        this.listener = listener;
        this.errorHandler = errorHandler;
        this.metricsCollector = metricsCollector;
        this.jsonFactory = objectMapper.getFactory();
        this.eventReader = objectMapper.reader().forType(eventClass);
        this.cursorHeaderWriter = DefaultObjectMapper.INSTANCE.writerFor(COLLECTION_OF_CURSORS);
    }

    private static Optional<String> getStreamId(ClientHttpResponse response) {
        HttpHeaders headers = response.getHeaders();
        String streamId = headers == null ? null : headers.getFirst("X-Nakadi-StreamId");
        return Optional.ofNullable(streamId);
    }

    private JsonInput openJsonInput() throws IOException {
        String cursorsHeader = this.getCursorsHeader();
        ClientHttpRequest request = this.clientHttpRequestFactory.createRequest(this.uri, HttpMethod.GET);
        if (cursorsHeader != null) {
            request.getHeaders().put("X-Nakadi-Cursors", Collections.singletonList(cursorsHeader));
        }
        ClientHttpResponse response = request.execute();
        try {
            Optional<String> streamId = NakadiReader.getStreamId(response);
            if (this.subscription.isPresent() && streamId.isPresent()) {
                this.cursorManager.addStreamId(this.subscription.get(), streamId.get());
            }
            return new JsonInput(this.jsonFactory, response);
        }
        catch (Throwable throwable) {
            try {
                response.close();
            }
            catch (Throwable suppressed) {
                throwable.addSuppressed(suppressed);
            }
            throw throwable;
        }
    }

    @Nullable
    private String getCursorsHeader() throws IOException {
        Collection<Cursor> lockedCursors;
        if (!this.subscription.isPresent() && !(lockedCursors = this.getLockedCursors()).isEmpty()) {
            return this.cursorHeaderWriter.writeValueAsString(lockedCursors);
        }
        return null;
    }

    private Collection<Cursor> getLockedCursors() throws IOException {
        Collection<Cursor> cursors = this.cursorManager.getCursors(this.eventNames.iterator().next());
        if (this.lock.isPresent()) {
            Map<String, String> offsets = cursors.stream().collect(Collectors.toMap(Cursor::getPartition, Cursor::getOffset));
            List<Partition> partitions = this.lock.get().getPartitions();
            return partitions.stream().map(partition -> new Cursor(partition.getPartition(), offsets.getOrDefault(partition.getPartition(), "BEGIN"))).collect(Collectors.toList());
        }
        return cursors;
    }

    private String getCurrentEventName(Cursor cursor) {
        String eventName = cursor.getEventType();
        return eventName != null ? eventName : this.eventNames.iterator().next();
    }

    private void processBatch(Batch<T> batch) throws IOException {
        Cursor cursor = batch.getCursor();
        String eventName = this.getCurrentEventName(cursor);
        try {
            this.listener.accept(batch.getEvents());
            this.cursorManager.onSuccess(eventName, cursor);
        }
        catch (EventAlreadyProcessedException e) {
            LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{eventName, cursor.getPartition(), cursor.getOffset()});
        }
        catch (Throwable throwable) {
            LOG.warn("Exception while processing events for [{}] on partition [{}] at offset [{}]", new Object[]{eventName, cursor.getPartition(), cursor.getOffset(), throwable});
            throw throwable;
        }
    }

    private Cursor readCursor(JsonParser jsonParser) throws IOException {
        String partition = null;
        String offset = null;
        String eventType = null;
        String cursorToken = null;
        this.expectToken(jsonParser, JsonToken.START_OBJECT);
        block12: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "partition": {
                    partition = jsonParser.nextTextValue();
                    continue block12;
                }
                case "offset": {
                    offset = jsonParser.nextTextValue();
                    continue block12;
                }
                case "event_type": {
                    eventType = jsonParser.nextTextValue();
                    continue block12;
                }
                case "cursor_token": {
                    cursorToken = jsonParser.nextTextValue();
                    continue block12;
                }
            }
            LOG.warn("Unexpected field [{}] in cursor", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (partition == null) {
            throw new IllegalStateException("Could not read partition from cursor");
        }
        if (offset == null) {
            throw new IllegalStateException("Could not read offset from cursor for partition [" + partition + "]");
        }
        return new Cursor(partition, offset, eventType, cursorToken);
    }

    private List<T> readEvents(JsonParser jsonParser) throws IOException {
        this.expectToken(jsonParser, JsonToken.START_ARRAY);
        jsonParser.clearCurrentToken();
        Iterator eventIterator = this.eventReader.readValues(jsonParser, this.eventClass);
        ArrayList<T> events = new ArrayList<T>();
        while (true) {
            try {
                while (eventIterator.hasNext()) {
                    events.add(this.eventClass.cast(eventIterator.next()));
                }
            }
            catch (RuntimeException e) {
                Throwable cause = e.getCause();
                if (cause instanceof JsonMappingException) {
                    this.errorHandler.onMappingException((JsonMappingException)cause);
                    continue;
                }
                if (cause instanceof IOException) {
                    throw (IOException)cause;
                }
                throw e;
            }
            break;
        }
        return events;
    }

    @Override
    public void run() throws IOException {
        try {
            this.runInternal();
        }
        catch (BackoffException e) {
            throw e.getCause();
        }
    }

    void runInternal() throws IOException, BackoffException {
        LOG.info("Starting to listen for events for {}", this.eventNames);
        JsonInput jsonInput = this.openJsonInput();
        int errorCount = 0;
        while (true) {
            try {
                while (true) {
                    JsonParser jsonParser = jsonInput.getJsonParser();
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedIOException("Interrupted");
                    }
                    this.readBatch(jsonParser);
                    errorCount = 0;
                }
            }
            catch (IOException e) {
                this.metricsCollector.markErrorWhileConsuming();
                if (errorCount > 0) {
                    LOG.warn("Got [{}] [{}] while reading events for {} after [{}] retries", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, errorCount, e});
                } else {
                    LOG.info("Got [{}] [{}] while reading events for {}", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, e});
                }
                jsonInput.close();
                if (Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                    break;
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", (Object)errorCount);
                    jsonInput = this.backoffStrategy.call(errorCount, e, this::openJsonInput);
                    LOG.info("Reconnected after [{}] errors", (Object)errorCount);
                    this.metricsCollector.markReconnection();
                }
                catch (InterruptedException interruptedException) {
                    LOG.warn("Interrupted during reconnection", (Throwable)interruptedException);
                    Thread.currentThread().interrupt();
                    return;
                }
                ++errorCount;
                continue;
            }
            catch (Throwable e) {
                try {
                    jsonInput.close();
                }
                catch (Throwable suppressed) {
                    e.addSuppressed(e);
                }
                throw e;
            }
            break;
        }
    }

    void readSingleBatch() throws IOException {
        try (JsonInput jsonInput = this.openJsonInput();){
            JsonParser jsonParser = jsonInput.getJsonParser();
            this.readBatch(jsonParser);
        }
        catch (IOException e) {
            this.metricsCollector.markErrorWhileConsuming();
            throw e;
        }
    }

    private void readBatch(JsonParser jsonParser) throws IOException {
        LOG.debug("Waiting for next batch of events for {}", this.eventNames);
        this.expectToken(jsonParser, JsonToken.START_OBJECT);
        this.metricsCollector.markMessageReceived();
        Cursor cursor = null;
        List<T> events = null;
        block10: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "cursor": {
                    cursor = this.readCursor(jsonParser);
                    continue block10;
                }
                case "events": {
                    events = this.readEvents(jsonParser);
                    continue block10;
                }
                case "info": {
                    LOG.debug("Skipping stream info in event batch");
                    jsonParser.nextToken();
                    jsonParser.skipChildren();
                    continue block10;
                }
            }
            LOG.warn("Unexpected field [{}] in event batch", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (cursor == null) {
            throw new IOException("Could not read cursor");
        }
        String eventName = this.getCurrentEventName(cursor);
        LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", new Object[]{eventName, cursor.getPartition(), cursor.getOffset()});
        if (events == null) {
            this.metricsCollector.markEventsReceived(0);
        } else {
            this.metricsCollector.markEventsReceived(events.size());
            Batch<T> batch = new Batch<T>(cursor, Collections.unmodifiableList(events));
            this.processBatch(batch);
            this.metricsCollector.markMessageSuccessfullyProcessed();
        }
    }

    private void expectToken(JsonParser jsonParser, JsonToken expectedToken) throws IOException {
        JsonToken token = jsonParser.nextToken();
        if (token == null) {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedIOException("Thread was interrupted");
            }
            throw new EOFException("Stream was closed");
        }
        if (token != expectedToken) {
            throw new IOException(String.format("Expected [%s] but got [%s]", expectedToken, token));
        }
    }

    static class JsonInput
    implements Closeable {
        private final JsonFactory jsonFactory;
        private final ClientHttpResponse response;
        private JsonParser jsonParser;

        JsonInput(JsonFactory jsonFactory, ClientHttpResponse response) {
            this.jsonFactory = jsonFactory;
            this.response = response;
        }

        ClientHttpResponse getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() throws IOException {
            if (this.jsonParser == null) {
                this.jsonParser = this.jsonFactory.createParser(this.response.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
            }
            return this.jsonParser;
        }

        @Override
        public void close() {
            try {
                if (this.jsonParser != null) {
                    try {
                        LOG.trace("Trying to close json parser");
                        this.jsonParser.close();
                        LOG.trace("Closed json parser");
                    }
                    catch (IOException e) {
                        LOG.warn("Could not close json parser", (Throwable)e);
                    }
                }
            }
            finally {
                LOG.trace("Trying to close response");
                this.response.close();
                LOG.trace("Closed response");
            }
        }
    }
}

