/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.client.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import de.zalando.paradox.nakadi.consumer.core.AuthorizationValueProvider;
import de.zalando.paradox.nakadi.consumer.core.DefaultObjectMapper;
import de.zalando.paradox.nakadi.consumer.core.EventStreamConfig;
import de.zalando.paradox.nakadi.consumer.core.client.Client;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiEventBatch;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.EventUtils;
import de.zalando.paradox.nakadi.consumer.core.http.okhttp.RxHttpRequest;
import de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetEvents;
import de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetPartitions;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import rx.Observable;
import rx.Single;

public class ClientImpl
implements Client {
    private static final TypeReference<List<NakadiPartition>> NAKADI_PARTITIONS_TYPE = new TypeReference<List<NakadiPartition>>(){};
    private static final MediaType CONTENT_TYPE = MediaType.parse((String)"application/json");
    private final String nakadiUrl;
    private final ObjectMapper objectMapper;
    private final AuthorizationValueProvider authorizationValueProvider;
    private final long partitionsTimeoutMillis;
    private final long eventsTimeoutMillis;
    private final OkHttpClient okHttpClient = this.initHttpClient();

    private ClientImpl(Builder builder) {
        this.nakadiUrl = Objects.requireNonNull(builder.nakadiUrl, "nakadiUrl must not be null");
        this.objectMapper = Objects.requireNonNull(builder.objectMapper, "objectMapper must not be null");
        this.authorizationValueProvider = builder.authorizationValueProvider;
        this.partitionsTimeoutMillis = builder.partitionsTimeoutMillis;
        this.eventsTimeoutMillis = builder.eventsTimeoutMillis;
    }

    @Override
    public Single<List<NakadiPartition>> getPartitions(EventType eventType) {
        HttpGetPartitions httpGetPartitions = new HttpGetPartitions(this.nakadiUrl, eventType);
        Observable<HttpResponseChunk> request = new RxHttpRequest(this.partitionsTimeoutMillis, this.authorizationValueProvider).createRequest(httpGetPartitions);
        return request.filter(chunk -> {
            Preconditions.checkArgument((chunk.getStatusCode() == 200 ? 1 : 0) != 0, (String)"Get partitions for event [%s] , result [%s / %s]", (Object[])new Object[]{eventType, chunk.getStatusCode(), chunk.getContent()});
            return true;
        }).map(chunk -> this.getPartitions(chunk.getContent())).firstOrDefault(Collections.emptyList()).toSingle();
    }

    private List<NakadiPartition> getPartitions(String content) {
        try {
            return (List)this.objectMapper.readValue(content, (TypeReference)new TypeReference<ArrayList<NakadiPartition>>(){});
        }
        catch (IOException e) {
            ThrowableUtils.throwException(e);
            return null;
        }
    }

    @Override
    public Single<String> getEvent(EventTypeCursor cursor) {
        Observable<HttpResponseChunk> request = this.getContent0(cursor, 1);
        return request.map(chunk -> this.getEvent0(chunk.getContent(), cursor.getEventType())).firstOrDefault(null).toSingle();
    }

    @Override
    public Single<String> getContent(EventTypeCursor cursor) {
        Observable<HttpResponseChunk> request = this.getContent0(cursor, 1);
        return request.map(HttpResponseChunk::getContent).firstOrDefault(null).toSingle();
    }

    @Override
    public Single<List<NakadiPartition>> getCursorsLag(List<EventTypeCursor> cursors) {
        Preconditions.checkNotNull(cursors, (Object)"cursors must not be null");
        Preconditions.checkArgument((!cursors.isEmpty() ? 1 : 0) != 0, (Object)"cursors must not be empty");
        Preconditions.checkArgument((cursors.stream().map(EventTypeCursor::getEventType).distinct().count() == 1L ? 1 : 0) != 0, (Object)"cursors must contain cursors of only one type");
        return Single.fromCallable(() -> {
            EventType eventType = ((EventTypeCursor)cursors.get(0)).getEventType();
            HttpUrl httpUrl = HttpUrl.parse((String)this.nakadiUrl).newBuilder().addPathSegment("event-types").addPathSegment(eventType.getName()).addPathSegment("cursors-lag").build();
            Request request = new Request.Builder().url(httpUrl).post(RequestBody.create((MediaType)CONTENT_TYPE, (String)this.getNakadiCursors(cursors))).build();
            Response response = this.okHttpClient.newCall(request).execute();
            if (response.isSuccessful()) {
                return (List)this.objectMapper.readValue(response.body().byteStream(), NAKADI_PARTITIONS_TYPE);
            }
            throw new RuntimeException(String.format("Get cursors lag failed for cursors [%s]: [%s]", cursors, response.body().string()));
        });
    }

    private String getNakadiCursors(List<EventTypeCursor> cursors) throws JsonProcessingException {
        List nakadiCursors = cursors.stream().map(cursor -> new NakadiCursor(cursor.getPartition(), cursor.getOffset())).collect(Collectors.toList());
        return this.objectMapper.writeValueAsString(nakadiCursors);
    }

    private Observable<HttpResponseChunk> getContent0(EventTypeCursor cursor, int streamLimit) {
        Preconditions.checkArgument((streamLimit > 0 ? 1 : 0) != 0, (Object)"streamLimit must be greater than 0");
        EventStreamConfig eventStreamConfig = new EventStreamConfig.Builder().withStreamLimit(streamLimit).build();
        HttpGetEvents httpGetEvents = new HttpGetEvents(this.nakadiUrl, cursor, eventStreamConfig);
        return new RxHttpRequest(this.eventsTimeoutMillis, this.authorizationValueProvider).createRequest(httpGetEvents).filter(chunk -> {
            Preconditions.checkArgument((chunk.getStatusCode() == 200 ? 1 : 0) != 0, (String)"Get for cursor [%s] , result [%s / %s]", (Object[])new Object[]{cursor, chunk.getStatusCode(), chunk.getContent()});
            Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)chunk.getContent()), (String)"Event not found for cursor [%s]", (Object[])new Object[]{cursor});
            return true;
        });
    }

    private String getEvent0(String content, EventType eventType) {
        NakadiEventBatch<String> events = EventUtils.getRawEventBatch(this.objectMapper, content, eventType);
        Preconditions.checkArgument((events != null ? 1 : 0) != 0);
        return (String)Iterables.getOnlyElement(events.getEvents());
    }

    ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    @VisibleForTesting
    OkHttpClient initHttpClient() {
        return new OkHttpClient.Builder().addInterceptor(this.getAuthorizationInterceptor()).build();
    }

    private Interceptor getAuthorizationInterceptor() {
        return new Interceptor(){

            public Response intercept(Interceptor.Chain chain) throws IOException {
                if (ClientImpl.this.authorizationValueProvider != null) {
                    return chain.proceed(chain.request().newBuilder().addHeader("Authorization", (String)ClientImpl.this.authorizationValueProvider.get()).build());
                }
                return chain.proceed(chain.request());
            }
        };
    }

    public static class Builder {
        private final String nakadiUrl;
        private ObjectMapper objectMapper = new DefaultObjectMapper().jacksonObjectMapper();
        private long eventsTimeoutMillis = TimeUnit.SECONDS.toMillis(10L);
        private long partitionsTimeoutMillis = TimeUnit.SECONDS.toMillis(10L);
        private AuthorizationValueProvider authorizationValueProvider;

        public Builder(String nakadiUrl) {
            this.nakadiUrl = nakadiUrl;
        }

        public static Builder of(String nakadiUrl) {
            return new Builder(nakadiUrl);
        }

        public Builder withObjectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder withAuthorization(AuthorizationValueProvider authorizationValueProvider) {
            this.authorizationValueProvider = authorizationValueProvider;
            return this;
        }

        public Builder withEventsTimeoutMillis(long eventsTimeoutMillis) {
            this.eventsTimeoutMillis = eventsTimeoutMillis;
            return this;
        }

        public Builder withPartitionsTimeoutMillis(long partitionsTimeoutMillis) {
            this.partitionsTimeoutMillis = partitionsTimeoutMillis;
            return this;
        }

        public ClientImpl build() {
            return new ClientImpl(this);
        }
    }
}

