/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.core.http.requests;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.core.ConsumerConfig;
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartitions;
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveReceiver;
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
import de.zalando.paradox.nakadi.consumer.core.http.okhttp.RxHttpRequest;
import de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetEventsHandler;
import de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetPartitions;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionRebalanceListener;
import de.zalando.paradox.nakadi.consumer.core.utils.LoggingUtils;
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import rx.Observable;

public class HttpGetPartitionsHandler
implements HttpReactiveHandler,
PartitionRebalanceListener,
Closeable {
    private final ConcurrentMap<String, HttpReactiveReceiver> partitionReceiver = new ConcurrentHashMap<String, HttpReactiveReceiver>();
    private final ConsumerConfig config;
    private final Logger log;
    private final String baseUri;
    private final EventType eventType;
    private PartitionCoordinator coordinator;
    private HttpGetPartitions httpGetPartitions;
    private final AtomicBoolean rebalanceRegistered = new AtomicBoolean(false);

    public HttpGetPartitionsHandler(ConsumerConfig config) {
        this.config = config;
        this.log = LoggingUtils.getLogger(this.getClass(), config.getEventType());
        this.httpGetPartitions = new HttpGetPartitions(config.getNakadiUrl(), config.getEventType());
        this.baseUri = config.getNakadiUrl();
        this.eventType = config.getEventType();
        this.coordinator = config.getPartitionCoordinator();
    }

    @Override
    public void init() {
        if (this.rebalanceRegistered.compareAndSet(false, true)) {
            this.coordinator.registerRebalanceListener(this.eventType, this);
        }
    }

    @Override
    public void close() {
        if (this.rebalanceRegistered.compareAndSet(true, false)) {
            try {
                List<EventTypePartition> partitions = this.partitionReceiver.keySet().stream().map(partition -> EventTypePartition.of(this.eventType, partition)).collect(Collectors.toList());
                this.log.info("Handler close revokes partitions [{}]", partitions);
                this.onPartitionsRevoked(partitions);
            }
            finally {
                this.coordinator.unregisterRebalanceListener(this.eventType);
            }
        }
    }

    @Override
    public Logger getLogger(Class<?> clazz) {
        return LoggingUtils.getLogger(clazz, "partitions", this.eventType);
    }

    @Override
    public void onResponse(String content) {
        this.log.trace("ResultCallback [{}]", (Object)content);
        Optional<List<NakadiPartition>> nakadiPartitions = this.getPartitions(content);
        if (nakadiPartitions.isPresent()) {
            EventTypePartitions consumerPartitions = EventTypePartitions.of(this.eventType, this.partitionReceiver.keySet());
            this.coordinator.rebalance(consumerPartitions, (Collection<NakadiPartition>)nakadiPartitions.get());
        }
    }

    private Optional<List<NakadiPartition>> getPartitions(String content) {
        try {
            return Optional.of(this.config.getObjectMapper().readValue(content, (TypeReference)new TypeReference<ArrayList<NakadiPartition>>(){}));
        }
        catch (IOException e) {
            this.log.error("Error while parsing partition information", (Throwable)e);
            return Optional.empty();
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<EventTypeCursor> cursors) {
        this.log.trace("onPartitionsAssigned [{}]", cursors);
        cursors.forEach(this::startReceiver);
    }

    @Override
    public void onPartitionsRevoked(Collection<EventTypePartition> partitions) {
        this.log.trace("onPartitionsRevoked [{}]", partitions);
        partitions.forEach(this::stopReceiver);
    }

    @Override
    public void onPartitionsHealthCheck() {
        this.log.trace("onPartitionsHealthCheck");
        this.partitionReceiver.entrySet().forEach(entry -> {
            HttpReactiveReceiver receiver = (HttpReactiveReceiver)entry.getValue();
            if (receiver.isRunning() && !receiver.isSubscribed()) {
                EventTypePartition eventTypePartition = EventTypePartition.of(this.eventType, (String)entry.getKey());
                try {
                    this.log.warn("Receiver for partition [{}] is running but unsubscribed", (Object)eventTypePartition);
                    Thread.sleep(200L);
                }
                catch (InterruptedException e) {
                    ThrowableUtils.throwException(e);
                }
                if (receiver.isRunning() && !receiver.isSubscribed()) {
                    this.log.warn("Force stop receiver for partition [{}]", (Object)eventTypePartition);
                    this.stopReceiver(eventTypePartition);
                }
            }
        });
    }

    private HttpReactiveReceiver stopReceiver(EventTypePartition eventTypePartition) {
        this.checkEventTypePartition(eventTypePartition);
        HttpReactiveReceiver receiver = (HttpReactiveReceiver)this.partitionReceiver.remove(eventTypePartition.getPartition());
        if (null != receiver) {
            try {
                this.log.info("Stopping receiver for partition [{}]", (Object)eventTypePartition);
                receiver.close();
                this.log.info("Receiver for partition [{}] stopped", (Object)eventTypePartition);
            }
            catch (Exception e) {
                this.log.error("Stopping receiver for partition [{}] failed due to [{}]", (Object)eventTypePartition, (Object)ExceptionUtils.getMessage((Throwable)e));
            }
        }
        return receiver;
    }

    private void startReceiver(EventTypeCursor cursor) {
        this.checkEventTypePartition(cursor.getEventTypePartition());
        String partition = cursor.getEventTypePartition().getPartition();
        HttpReactiveReceiver receiver = (HttpReactiveReceiver)this.partitionReceiver.get(partition);
        if (null == receiver) {
            this.newReceiver(cursor);
        } else if (receiver.isRunning() && !receiver.isSubscribed()) {
            try {
                this.log.warn("Receiver for cursor [{}] is running but unsubscribed", (Object)cursor);
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                ThrowableUtils.throwException(e);
            }
            if (receiver.isRunning() && !receiver.isSubscribed()) {
                this.log.warn("Force restart receiver for cursor [{}]", (Object)cursor);
                HttpReactiveReceiver oldReceiver = this.stopReceiver(cursor.getEventTypePartition());
                if (receiver == oldReceiver) {
                    this.newReceiver(cursor);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void newReceiver(EventTypeCursor cursor) {
        block8: {
            this.checkEventTypePartition(cursor.getEventTypePartition());
            String partition = cursor.getEventTypePartition().getPartition();
            HttpReactiveReceiver receiver = null;
            try {
                receiver = new HttpReactiveReceiver(new HttpGetEventsHandler(this.baseUri, cursor, this.config));
                if (null == this.partitionReceiver.putIfAbsent(partition, receiver)) {
                    this.log.info("Starting receiver for cursor [{}]", (Object)cursor);
                    receiver.init();
                    this.log.info("Receiver started for cursor [{}]", (Object)cursor);
                }
            }
            catch (Exception e) {
                this.log.error("Cannot start receiver for cursor [{}] due to [{}]", (Object)cursor, (Object)ExceptionUtils.getMessage((Throwable)e));
                if (null == receiver) break block8;
                try {
                    receiver.close();
                }
                catch (IOException e1) {
                    this.log.error("Stopping receiver for cursor [{}] failed due to [{}]", (Object)cursor, (Object)ExceptionUtils.getMessage((Throwable)e1));
                }
                finally {
                    this.partitionReceiver.remove(partition);
                }
            }
        }
    }

    private void checkEventTypePartition(EventTypePartition p) {
        Preconditions.checkArgument((boolean)this.eventType.equals(p.getEventType()), (String)"Event type mismatch [%s]/[%s]", (Object[])new Object[]{this.eventType, p.getEventType()});
    }

    @Override
    public void onErrorResponse(int statusCode, String content) {
        this.log.trace("Error result [{} / {}]", (Object)statusCode, (Object)content);
    }

    @Override
    public void onStarted() {
        this.log.trace("Started");
    }

    @Override
    public void onFinished() {
        this.log.trace("Finished");
    }

    @Override
    public long getRetryAfterMillis() {
        if (this.config.getPartitionsRetryRandomMillis() > 0L) {
            return this.config.getPartitionsRetryAfterMillis() + ThreadLocalRandom.current().nextLong(this.config.getPartitionsRetryRandomMillis());
        }
        return this.config.getPartitionsRetryAfterMillis();
    }

    @Override
    public Observable<HttpResponseChunk> createRequest() {
        return new RxHttpRequest(this.config.getPartitionsTimeoutMillis(), this.config.getAuthorizationValueProvider()).createRequest(this.httpGetPartitions);
    }
}

