/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.fahrschein.BackoffException;
import org.zalando.fahrschein.BackoffStrategy;
import org.zalando.fahrschein.BatchHandler;
import org.zalando.fahrschein.CursorManager;
import org.zalando.fahrschein.DefaultBatchHandler;
import org.zalando.fahrschein.DefaultObjectMapper;
import org.zalando.fahrschein.EventAlreadyProcessedException;
import org.zalando.fahrschein.EventReader;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.JsonParserHelper;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.MappingEventReader;
import org.zalando.fahrschein.MetricsCollector;
import org.zalando.fahrschein.NoMetricsCollector;
import org.zalando.fahrschein.Preconditions;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.domain.Partition;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.http.api.Headers;
import org.zalando.fahrschein.http.api.Request;
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.api.Response;

class NakadiReader<T>
implements IORunnable {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private static final TypeReference<Collection<Cursor>> COLLECTION_OF_CURSORS = new TypeReference<Collection<Cursor>>(){};
    private final URI uri;
    private final RequestFactory requestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final Set<String> eventNames;
    private final Optional<Subscription> subscription;
    private final Optional<Lock> lock;
    private final EventReader<T> eventReader;
    private final Listener<T> listener;
    private final BatchHandler batchHandler;
    private final JsonFactory jsonFactory;
    private final ObjectWriter cursorHeaderWriter;
    private final MetricsCollector metricsCollector;

    NakadiReader(URI uri, RequestFactory requestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, Set<String> eventNames, Optional<Subscription> subscription, Optional<Lock> lock, Class<T> eventClass, Listener<T> listener) {
        this(uri, requestFactory, backoffStrategy, cursorManager, eventNames, subscription, lock, new MappingEventReader<T>(eventClass, objectMapper), listener, DefaultBatchHandler.INSTANCE, NoMetricsCollector.NO_METRICS_COLLECTOR);
    }

    NakadiReader(URI uri, RequestFactory requestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, Set<String> eventNames, Optional<Subscription> subscription, Optional<Lock> lock, EventReader<T> eventReader, Listener<T> listener, BatchHandler batchHandler, MetricsCollector metricsCollector) {
        Preconditions.checkState(subscription.isPresent() || eventNames.size() == 1, "Low level api only supports reading from a single event");
        this.uri = uri;
        this.requestFactory = requestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.eventNames = eventNames;
        this.subscription = subscription;
        this.lock = lock;
        this.eventReader = eventReader;
        this.listener = listener;
        this.batchHandler = batchHandler;
        this.metricsCollector = metricsCollector;
        this.jsonFactory = DefaultObjectMapper.INSTANCE.getFactory();
        this.cursorHeaderWriter = DefaultObjectMapper.INSTANCE.writerFor(COLLECTION_OF_CURSORS);
    }

    private static Optional<String> getStreamId(Response response) {
        Headers headers = response.getHeaders();
        String streamId = headers == null ? null : headers.getFirst("X-Nakadi-StreamId");
        return Optional.ofNullable(streamId);
    }

    private JsonInput openJsonInput() throws IOException {
        String cursorsHeader = this.getCursorsHeader();
        Request request = this.requestFactory.createRequest(this.uri, "GET");
        if (cursorsHeader != null) {
            request.getHeaders().put("X-Nakadi-Cursors", cursorsHeader);
        }
        Response response = request.execute();
        try {
            Optional<String> streamId = NakadiReader.getStreamId(response);
            if (this.subscription.isPresent() && streamId.isPresent()) {
                this.cursorManager.addStreamId(this.subscription.get(), streamId.get());
            }
            return new JsonInput(this.jsonFactory, response);
        }
        catch (Throwable throwable) {
            try {
                response.close();
            }
            catch (Throwable suppressed) {
                throwable.addSuppressed(suppressed);
            }
            throw throwable;
        }
    }

    @Nullable
    private String getCursorsHeader() throws IOException {
        Collection<Cursor> lockedCursors;
        if (!this.subscription.isPresent() && !(lockedCursors = this.getLockedCursors()).isEmpty()) {
            return this.cursorHeaderWriter.writeValueAsString(lockedCursors);
        }
        return null;
    }

    private Collection<Cursor> getLockedCursors() throws IOException {
        Collection<Cursor> cursors = this.cursorManager.getCursors(this.eventNames.iterator().next());
        if (this.lock.isPresent()) {
            Map<String, String> offsets = cursors.stream().collect(Collectors.toMap(Cursor::getPartition, Cursor::getOffset));
            List<Partition> partitions = this.lock.get().getPartitions();
            return partitions.stream().map(partition -> new Cursor(partition.getPartition(), offsets.getOrDefault(partition.getPartition(), "BEGIN"))).collect(Collectors.toList());
        }
        return cursors;
    }

    private String getCurrentEventName(Cursor cursor) {
        String eventName = cursor.getEventType();
        return eventName != null ? eventName : this.eventNames.iterator().next();
    }

    private void processBatch(final Batch<T> batch) throws IOException {
        final Cursor cursor = batch.getCursor();
        final String eventName = this.getCurrentEventName(cursor);
        this.batchHandler.processBatch(new IORunnable(){

            @Override
            public void run() throws IOException {
                try {
                    NakadiReader.this.listener.accept(batch.getEvents());
                    NakadiReader.this.cursorManager.onSuccess(eventName, cursor);
                }
                catch (EventAlreadyProcessedException e) {
                    LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{eventName, cursor.getPartition(), cursor.getOffset()});
                }
                catch (Throwable throwable) {
                    LOG.warn("Exception while processing events for [{}] on partition [{}] at offset [{}]", new Object[]{eventName, cursor.getPartition(), cursor.getOffset(), throwable});
                    throw throwable;
                }
            }
        });
    }

    private Cursor readCursor(JsonParser jsonParser) throws IOException {
        String partition = null;
        String offset = null;
        String eventType = null;
        String cursorToken = null;
        JsonParserHelper.expectToken(jsonParser, JsonToken.START_OBJECT);
        block12: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "partition": {
                    partition = jsonParser.nextTextValue();
                    continue block12;
                }
                case "offset": {
                    offset = jsonParser.nextTextValue();
                    continue block12;
                }
                case "event_type": {
                    eventType = jsonParser.nextTextValue();
                    continue block12;
                }
                case "cursor_token": {
                    cursorToken = jsonParser.nextTextValue();
                    continue block12;
                }
            }
            LOG.warn("Unexpected field [{}] in cursor", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (partition == null) {
            throw new IllegalStateException("Could not read partition from cursor");
        }
        if (offset == null) {
            throw new IllegalStateException("Could not read offset from cursor for partition [" + partition + "]");
        }
        return new Cursor(partition, offset, eventType, cursorToken);
    }

    @Override
    public void run() throws IOException {
        try {
            this.runInternal();
        }
        catch (BackoffException e) {
            throw e.getCause();
        }
    }

    void runInternal() throws BackoffException {
        LOG.info("Starting to listen for events for {}", this.eventNames);
        JsonInput jsonInput = null;
        int errorCount = 0;
        while (true) {
            try {
                while (true) {
                    if (jsonInput == null) {
                        jsonInput = this.openJsonInput();
                    }
                    JsonParser jsonParser = jsonInput.getJsonParser();
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedIOException("Interrupted");
                    }
                    this.readBatch(jsonParser);
                    errorCount = 0;
                }
            }
            catch (IOException e) {
                boolean wasInterrupted = Thread.currentThread().isInterrupted();
                this.metricsCollector.markErrorWhileConsuming();
                if (errorCount > 0) {
                    LOG.warn("Got [{}] [{}] while reading events for {} after [{}] retries", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, errorCount, e});
                } else {
                    LOG.info("Got [{}] [{}] while reading events for {}", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, e});
                }
                this.closeJsonInput(jsonInput);
                if (wasInterrupted || Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                    break;
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", (Object)errorCount);
                    jsonInput = this.backoffStrategy.call(errorCount, e, this::openJsonInput);
                    LOG.info("Reconnected after [{}] errors", (Object)errorCount);
                    this.metricsCollector.markReconnection();
                }
                catch (InterruptedException interruptedException) {
                    LOG.warn("Interrupted during reconnection", (Throwable)interruptedException);
                    Thread.currentThread().interrupt();
                    return;
                }
                ++errorCount;
                continue;
            }
            catch (Throwable e) {
                LOG.warn("Got [{}] [{}] while reading events for {}", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, e});
                try {
                    this.closeJsonInput(jsonInput);
                }
                catch (Throwable suppressed) {
                    e.addSuppressed(e);
                }
                throw e;
            }
            break;
        }
    }

    private void closeJsonInput(@Nullable JsonInput jsonInput) {
        if (jsonInput != null) {
            jsonInput.close();
        }
    }

    void readSingleBatch() throws IOException {
        try (JsonInput jsonInput = this.openJsonInput();){
            JsonParser jsonParser = jsonInput.getJsonParser();
            this.readBatch(jsonParser);
        }
        catch (IOException e) {
            this.metricsCollector.markErrorWhileConsuming();
            throw e;
        }
    }

    private void readBatch(JsonParser jsonParser) throws IOException {
        LOG.debug("Waiting for next batch of events for {}", this.eventNames);
        JsonParserHelper.expectToken(jsonParser, JsonToken.START_OBJECT);
        this.metricsCollector.markMessageReceived();
        Cursor cursor = null;
        List<T> events = null;
        block16: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
            String field;
            switch (field = jsonParser.getCurrentName()) {
                case "cursor": {
                    cursor = this.readCursor(jsonParser);
                    continue block16;
                }
                case "events": {
                    events = this.eventReader.read(jsonParser);
                    continue block16;
                }
                case "info": {
                    if (LOG.isDebugEnabled()) {
                        JsonParserHelper.expectToken(jsonParser, JsonToken.START_OBJECT);
                        block17: while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
                            String currentFieldName;
                            switch (currentFieldName = jsonParser.getCurrentName()) {
                                case "debug": {
                                    String debug = jsonParser.nextTextValue();
                                    if (debug == null || debug.isEmpty()) continue block17;
                                    LOG.debug("Stream info: {}", (Object)debug);
                                    continue block17;
                                }
                            }
                            jsonParser.nextToken();
                            jsonParser.skipChildren();
                        }
                        continue block16;
                    }
                    jsonParser.nextToken();
                    jsonParser.skipChildren();
                    continue block16;
                }
            }
            LOG.warn("Unexpected field [{}] in event batch", (Object)field);
            jsonParser.nextToken();
            jsonParser.skipChildren();
        }
        if (cursor == null) {
            throw new IOException("Could not read cursor");
        }
        String eventName = this.getCurrentEventName(cursor);
        LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", new Object[]{eventName, cursor.getPartition(), cursor.getOffset()});
        if (events == null) {
            this.metricsCollector.markEventsReceived(0);
        } else {
            this.metricsCollector.markEventsReceived(events.size());
            Batch<T> batch = new Batch<T>(cursor, Collections.unmodifiableList(events));
            this.processBatch(batch);
            this.metricsCollector.markMessageSuccessfullyProcessed();
        }
    }

    static class JsonInput
    implements Closeable {
        private final JsonFactory jsonFactory;
        private final Response response;
        private JsonParser jsonParser;

        JsonInput(JsonFactory jsonFactory, Response response) {
            this.jsonFactory = jsonFactory;
            this.response = response;
        }

        Response getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() throws IOException {
            if (this.jsonParser == null) {
                this.jsonParser = this.jsonFactory.createParser(this.response.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
            }
            return this.jsonParser;
        }

        @Override
        public void close() {
            try {
                if (this.jsonParser != null) {
                    try {
                        LOG.trace("Trying to close json parser");
                        this.jsonParser.close();
                        LOG.trace("Closed json parser");
                    }
                    catch (IOException e) {
                        LOG.warn("Could not close json parser", (Throwable)e);
                    }
                }
            }
            finally {
                LOG.trace("Trying to close response");
                this.response.close();
                LOG.trace("Closed response");
            }
        }
    }

    static final class Batch<T> {
        private final Cursor cursor;
        private final List<T> events;

        Batch(Cursor cursor, List<T> events) {
            this.cursor = cursor;
            this.events = events;
        }

        Cursor getCursor() {
            return this.cursor;
        }

        List<T> getEvents() {
            return this.events;
        }
    }
}

