/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.spring.boot.nakadi.config;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Set;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.Listener;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.spring.boot.nakadi.NakadiConsumer;
import org.zalando.spring.boot.nakadi.config.NakadiClientsProperties;

class DefaultNakadiConsumer
implements NakadiConsumer {
    private final NakadiClient nakadiClient;
    private final NakadiClientsProperties.Client.NakadiConsumerConfig consumerConfig;

    DefaultNakadiConsumer(NakadiClient client, NakadiClientsProperties.Client.NakadiConsumerConfig consumerConfig) {
        this.nakadiClient = client;
        this.consumerConfig = consumerConfig;
    }

    @Override
    public <Type> void listen(Class<Type> clazz, Listener<Type> listener) {
        try {
            this.nakadiClient.stream(this.getSubscription()).withStreamParameters(this.getStreamParameters(this.consumerConfig)).listen(clazz, listener);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    private Subscription getSubscription() throws IOException {
        return this.nakadiClient.subscription(this.consumerConfig.getApplicationName(), (Set)Sets.newHashSet((Iterable)this.consumerConfig.getTopics())).withConsumerGroup(this.consumerConfig.getConsumerGroup()).readFromBegin().subscribe();
    }

    private StreamParameters getStreamParameters(NakadiClientsProperties.Client.NakadiConsumerConfig config) {
        return new StreamParameters().withBatchFlushTimeout(config.getStreamParameters().getBatchFlushTimeout().intValue()).withBatchLimit(config.getStreamParameters().getBatchLimit().intValue()).withMaxUncommittedEvents(config.getStreamParameters().getMaxUncommittedEvents().intValue()).withStreamKeepAliveLimit(config.getStreamParameters().getStreamKeepAliveLimit().intValue()).withStreamLimit(config.getStreamParameters().getStreamLimit().intValue()).withStreamTimeout(config.getStreamParameters().getStreamTimeout().intValue());
    }

    @Override
    public <Type> IORunnable runnable(Class<Type> clazz, Listener<Type> listener) throws IOException {
        return this.nakadiClient.stream(this.getSubscription()).withStreamParameters(this.getStreamParameters(this.consumerConfig)).runnable(clazz, listener);
    }
}

