/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.spring.boot.nakadi.config;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Set;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.util.StringUtils;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.SubscriptionBuilder;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.spring.boot.nakadi.NakadiConsumer;
import org.zalando.spring.boot.nakadi.config.NakadiClientsProperties;
import org.zalando.spring.boot.nakadi.events.NakadiSubscriptionEvent;

class DefaultNakadiConsumer
implements NakadiConsumer,
BeanNameAware,
ApplicationEventPublisherAware {
    private final NakadiClient nakadiClient;
    private final NakadiClientsProperties.Client.NakadiConsumerConfig consumerConfig;
    private final NakadiClientsProperties.Client.NakadiConsumerDefaults consumerDefaults;
    private String beanName;
    private ApplicationEventPublisher eventPublisher;

    DefaultNakadiConsumer(NakadiClient client, NakadiClientsProperties.Client.NakadiConsumerConfig consumerConfig, NakadiClientsProperties.Client.NakadiConsumerDefaults consumerDefaults) {
        this.nakadiClient = client;
        this.consumerConfig = consumerConfig;
        this.consumerDefaults = consumerDefaults;
    }

    @Override
    public <Type> void listen(Class<Type> clazz, Listener<Type> listener) {
        try {
            Subscription sub = this.getSubscription();
            StreamParameters streamParams = this.getStreamParameters();
            this.nakadiClient.stream(sub).withStreamParameters(streamParams).listen(clazz, listener);
            this.eventPublisher.publishEvent((Object)new NakadiSubscriptionEvent(this.beanName, sub, streamParams, clazz.getName(), listener.getClass().getName()));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Subscription getSubscription() throws IOException {
        SubscriptionBuilder sb = this.nakadiClient.subscription(this.getApplicationName(), (Set)Sets.newHashSet((Iterable)this.consumerConfig.getTopics())).withConsumerGroup(this.getConsumerGroup());
        sb = NakadiClientsProperties.Position.END.equals((Object)this.consumerConfig.getReadFrom()) ? sb.readFromEnd() : sb.readFromBegin();
        return sb.subscribe();
    }

    protected StreamParameters getStreamParameters() {
        if (this.consumerConfig.getStreamParameters() == null && this.consumerDefaults.getStreamParameters() == null) {
            return new StreamParameters();
        }
        NakadiClientsProperties.StreamParametersConfig config = this.consumerConfig.getStreamParameters();
        if (config == null) {
            config = this.consumerDefaults.getStreamParameters();
        }
        StreamParameters sp = new StreamParameters();
        if (config.getBatchFlushTimeout() != null) {
            sp = sp.withBatchFlushTimeout(config.getBatchFlushTimeout().intValue());
        }
        if (config.getBatchLimit() != null) {
            sp = sp.withBatchLimit(config.getBatchLimit().intValue());
        }
        if (config.getMaxUncommittedEvents() != null) {
            sp = sp.withMaxUncommittedEvents(config.getMaxUncommittedEvents().intValue());
        }
        if (config.getStreamKeepAliveLimit() != null) {
            sp = sp.withStreamKeepAliveLimit(config.getStreamKeepAliveLimit().intValue());
        }
        if (config.getStreamLimit() != null) {
            sp = sp.withStreamLimit(config.getStreamLimit().intValue());
        }
        if (config.getStreamTimeout() != null) {
            sp = sp.withStreamTimeout(config.getStreamTimeout().intValue());
        }
        return sp;
    }

    @Override
    public <Type> IORunnable runnable(Class<Type> clazz, Listener<Type> listener) throws IOException {
        Subscription sub = this.getSubscription();
        StreamParameters streamParams = this.getStreamParameters();
        IORunnable result = this.nakadiClient.stream(sub).withStreamParameters(streamParams).runnable(clazz, listener);
        this.eventPublisher.publishEvent((Object)new NakadiSubscriptionEvent(this.beanName, sub, streamParams, clazz.getName(), listener.getClass().getName()));
        return result;
    }

    protected String getApplicationName() {
        return this.getValueOrDefaultElseThrow(this.consumerConfig.getApplicationName(), this.consumerDefaults.getApplicationName(), new RuntimeException("'applicationName' is required"));
    }

    protected String getConsumerGroup() {
        return this.getValueOrDefaultElseThrow(this.consumerConfig.getConsumerGroup(), this.consumerDefaults.getConsumerGroup(), new RuntimeException("'consumerGroup' is required"));
    }

    protected String getValueOrDefaultElseThrow(String value, String defaultValue, RuntimeException t) {
        if (StringUtils.hasText((String)value)) {
            return value;
        }
        if (StringUtils.hasText((String)defaultValue)) {
            return defaultValue;
        }
        throw t;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }
}

