/*
 * Decompiled with CFR 0.152.
 */
package de.zalando.paradox.nakadi.consumer.boot.components;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import de.zalando.paradox.nakadi.consumer.boot.EventReceiverRegistryConfiguration;
import de.zalando.paradox.nakadi.consumer.boot.components.ConsumerEventConfig;
import de.zalando.paradox.nakadi.consumer.boot.components.ConsumerPartitionCoordinatorProvider;
import de.zalando.paradox.nakadi.consumer.boot.components.EventTypeConsumer;
import de.zalando.paradox.nakadi.consumer.core.AuthorizationValueProvider;
import de.zalando.paradox.nakadi.consumer.core.ConsumerConfig;
import de.zalando.paradox.nakadi.consumer.core.EventHandler;
import de.zalando.paradox.nakadi.consumer.core.EventStreamConfig;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveHandler;
import de.zalando.paradox.nakadi.consumer.core.http.HttpReactiveReceiver;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.BatchEventsBulkHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.BatchEventsHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.JsonEventBulkHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.JsonEventHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.RawContentHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.RawEventBulkHandler;
import de.zalando.paradox.nakadi.consumer.core.http.handlers.RawEventHandler;
import de.zalando.paradox.nakadi.consumer.core.http.requests.HttpGetPartitionsHandler;
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventReceiverRegistry {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventReceiverRegistry.class);
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ConcurrentMap<EventTypeConsumer, HttpReactiveReceiver> receiverMap = new ConcurrentHashMap<EventTypeConsumer, HttpReactiveReceiver>();
    private final ConcurrentMap<EventTypeConsumer, EventHandler<?>> handlerMap = new ConcurrentHashMap();
    private final ConcurrentMap<Pair<String, String>, PartitionCoordinator> coordinatorMap = new ConcurrentHashMap<Pair<String, String>, PartitionCoordinator>();
    private final EventReceiverRegistryConfiguration config;
    private final String nakadiUrl;
    private final ConsumerPartitionCoordinatorProvider coordinatorProvider;
    private final List<ConsumerEventConfig> consumerEventConfigList;
    private final AuthorizationValueProvider authorizationValueProvider;
    private final ObjectMapper objectMapper;

    public EventReceiverRegistry(EventReceiverRegistryConfiguration config, @Nullable ObjectMapper objectMapper) {
        this.config = config;
        this.nakadiUrl = Objects.requireNonNull(config.getNakadiUrl(), "nakadiUrl must not be null");
        this.coordinatorProvider = Objects.requireNonNull(config.getCoordinatorProvider(), "coordinatorProvider must not be null");
        this.consumerEventConfigList = Objects.requireNonNull(config.getConsumerEventConfigList().getList(), "consumerEventConfigList must not be null");
        this.authorizationValueProvider = config.getAuthorizationValueProvider();
        this.objectMapper = objectMapper;
    }

    @PostConstruct
    public void init() {
        if (this.consumerEventConfigList.isEmpty()) {
            LOGGER.warn("Nakadi event receivers are not configured");
        } else {
            this.consumerEventConfigList.forEach(this::startReceiver);
        }
    }

    private void startReceiver(ConsumerEventConfig consumerEventConfig) {
        this.startReceiver(consumerEventConfig.getConsumerName(), consumerEventConfig.getEventName(), consumerEventConfig.getHandler());
    }

    private void startReceiver(String consumerName, String eventName, EventHandler<?> handler) {
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)consumerName), (Object)"consumerName must not be empty");
        Preconditions.checkArgument((boolean)StringUtils.isNotEmpty((CharSequence)eventName), (Object)"eventName must not be empty");
        Preconditions.checkArgument((null != handler ? 1 : 0) != 0, (Object)"handler must not be null");
        EventTypeConsumer etc = new EventTypeConsumer(eventName, consumerName);
        PartitionCoordinator coordinator = this.getPartitionCoordinator(consumerName, eventName);
        ConsumerConfig.Builder builder = ConsumerConfig.Builder.of((String)this.nakadiUrl, (String)eventName, (PartitionCoordinator)coordinator);
        if (null != this.authorizationValueProvider) {
            builder = builder.withAuthorization(this.authorizationValueProvider);
        }
        if (null != this.objectMapper) {
            builder = builder.withObjectMapper(this.objectMapper);
        }
        if (null != this.config.getEventsRetryAfterMillis()) {
            builder = builder.withEventsRetryAfterMillis(this.config.getEventsRetryAfterMillis().longValue());
        }
        if (null != this.config.getEventsTimeoutMillis()) {
            builder = builder.withEventsTimeoutMillis(this.config.getEventsTimeoutMillis().longValue());
        }
        if (null != this.config.getEventsRetryRandomMillis()) {
            builder = builder.withEventsRetryRandomMillis(this.config.getEventsRetryRandomMillis().longValue());
        }
        if (null != this.config.getPartitionsRetryAfterMillis()) {
            builder = builder.withPartitionsRetryAfterMillis(this.config.getPartitionsRetryAfterMillis().longValue());
        }
        if (null != this.config.getPartitionsTimeoutMillis()) {
            builder = builder.withPartitionsTimeoutMillis(this.config.getPartitionsTimeoutMillis().longValue());
        }
        if (null != this.config.getPartitionsRetryRandomMillis()) {
            builder = builder.withPartitionsRetryRandomMillis(this.config.getPartitionsRetryRandomMillis().longValue());
        }
        builder.withEventStreamConfig(EventStreamConfig.Builder.of().withBatchLimit(this.config.getEventsBatchLimit()).withBatchTimeoutSeconds(this.config.getEventsBatchTimeoutSeconds()).withStreamLimit(this.config.getEventsStreamLimit()).withStreamTimeoutSeconds(this.config.getEventsStreamTimeoutSeconds()).withStreamKeepAliveLimit(this.config.getEventsStreamKeepAliveLimit()).build());
        ConsumerConfig config = this.withEventHandler(builder, handler).build();
        HttpReactiveReceiver receiver = new HttpReactiveReceiver((HttpReactiveHandler)new HttpGetPartitionsHandler(config));
        Preconditions.checkState((null == this.receiverMap.putIfAbsent(etc, receiver) ? 1 : 0) != 0, (String)"Duplicated configuration for [%s]", (Object[])new Object[]{etc});
        this.handlerMap.putIfAbsent(etc, handler);
        LOGGER.info("Starting receiver for consumerName [{}] for event type [{}]", (Object)consumerName, (Object)config.getEventType());
        receiver.init();
    }

    private ConsumerConfig.Builder withEventHandler(ConsumerConfig.Builder builder, EventHandler<?> handler) {
        if (handler instanceof BatchEventsHandler) {
            builder.withBatchEventsHandler((BatchEventsHandler)handler);
        }
        if (handler instanceof BatchEventsBulkHandler) {
            builder.withBatchEventsBulkHandler((BatchEventsBulkHandler)handler);
        }
        if (handler instanceof RawContentHandler) {
            builder.withRawContentHandler((RawContentHandler)handler);
        }
        if (handler instanceof RawEventHandler) {
            builder.withRawEventHandler((RawEventHandler)handler);
        }
        if (handler instanceof RawEventBulkHandler) {
            builder.withRawEventBulkHandler((RawEventBulkHandler)handler);
        }
        if (handler instanceof JsonEventHandler) {
            builder.withJsonEventHandler((JsonEventHandler)handler);
        }
        if (handler instanceof JsonEventBulkHandler) {
            builder.withJsonEventBulkHandler((JsonEventBulkHandler)handler);
        }
        return builder;
    }

    private PartitionCoordinator getPartitionCoordinator(String consumerName, String eventName) {
        Pair key = this.config.isEventTypePartitionCoordinator() ? Pair.of((Object)consumerName, (Object)eventName) : Pair.of((Object)consumerName, (Object)consumerName);
        PartitionCoordinator coordinator = (PartitionCoordinator)this.coordinatorMap.get(key);
        if (null == coordinator) {
            coordinator = this.coordinatorProvider.getPartitionCoordinator(consumerName);
            PartitionCoordinator oldCoordinator = this.coordinatorMap.putIfAbsent((Pair<String, String>)key, coordinator);
            if (null == oldCoordinator) {
                coordinator.init();
            } else {
                coordinator = oldCoordinator;
            }
        }
        return coordinator;
    }

    @PreDestroy
    public void destroy() throws IOException, InterruptedException {
        this.running.set(false);
        this.stop();
    }

    public void stop() throws InterruptedException {
        this.coordinatorMap.values().forEach(coordinator -> {
            try {
                coordinator.close();
            }
            catch (Exception e) {
                LOGGER.error("Unexpected error while closing coordinator", (Throwable)e);
            }
        });
        this.receiverMap.values().forEach(receiver -> {
            try {
                receiver.close();
            }
            catch (Exception e) {
                LOGGER.error("Unexpected error while closing receiver", (Throwable)e);
            }
        });
        Thread.sleep(2000L);
    }

    public void restart() {
        if (this.running.get()) {
            LOGGER.info("Restart receivers");
            this.receiverMap.values().forEach(receiver -> {
                try {
                    receiver.init();
                }
                catch (Exception e) {
                    LOGGER.error("Unexpected error while reinit receiver", (Throwable)e);
                }
            });
            this.coordinatorMap.values().forEach(coordinator -> {
                try {
                    coordinator.init();
                }
                catch (Exception e) {
                    LOGGER.error("Unexpected error while reinit coordinator", (Throwable)e);
                }
            });
        } else {
            LOGGER.warn("Restart receivers not possible as not running");
        }
    }

    public Set<EventTypeConsumer> getEventTypeConsumers() {
        return Collections.unmodifiableSet(this.handlerMap.keySet());
    }

    public EventHandler<?> getEventTypeConsumerHandler(EventTypeConsumer eventTypeConsumer) {
        return (EventHandler)this.handlerMap.get(eventTypeConsumer);
    }
}

