/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.spring.boot.nakadi.config;

import com.google.common.collect.Sets;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Set;
import lombok.NonNull;
import org.zalando.fahrschein.AuthorizationBuilder;
import org.zalando.fahrschein.BackoffStrategy;
import org.zalando.fahrschein.EqualJitterBackoffStrategy;
import org.zalando.fahrschein.ExponentialBackoffStrategy;
import org.zalando.fahrschein.FullJitterBackoffStrategy;
import org.zalando.fahrschein.IORunnable;
import org.zalando.fahrschein.MetricsCollector;
import org.zalando.fahrschein.NakadiClient;
import org.zalando.fahrschein.NoBackoffStrategy;
import org.zalando.fahrschein.NoMetricsCollector;
import org.zalando.fahrschein.StreamParameters;
import org.zalando.fahrschein.SubscriptionBuilder;
import org.zalando.fahrschein.domain.Authorization;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.spring.boot.nakadi.MeterRegistryAware;
import org.zalando.spring.boot.nakadi.NakadiListener;
import org.zalando.spring.boot.nakadi.config.BackoffConfig;
import org.zalando.spring.boot.nakadi.config.ConsumerConfig;
import org.zalando.spring.boot.nakadi.config.JitterType;
import org.zalando.spring.boot.nakadi.config.MicrometerMetricsCollector;
import org.zalando.spring.boot.nakadi.config.NakadiConsumer;
import org.zalando.spring.boot.nakadi.config.Position;
import org.zalando.spring.boot.nakadi.config.StreamParametersConfig;

public class FahrscheinNakadiConsumer
implements NakadiConsumer,
MeterRegistryAware {
    @NonNull
    private final NakadiClient nakadiClient;
    @NonNull
    private final ConsumerConfig consumerConfig;
    private MeterRegistry meterRegistry;

    @Override
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override
    public void setMeterRegistry(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @Override
    public <Type> IORunnable runnable(NakadiListener<Type> listener) throws IOException {
        Subscription sub = this.getSubscription();
        StreamParameters streamParams = this.getStreamParameters();
        IORunnable result = this.nakadiClient.stream(sub).withStreamParameters(streamParams).withBackoffStrategy(FahrscheinNakadiConsumer.getBackoffStrategy(this.consumerConfig)).withMetricsCollector(FahrscheinNakadiConsumer.getMetricsCollector(this.consumerConfig, this.meterRegistry)).runnable(listener.getEventType(), listener);
        return result;
    }

    protected static MetricsCollector getMetricsCollector(ConsumerConfig consumerConfig, MeterRegistry meterRegistry) {
        if (meterRegistry != null && consumerConfig.getRecordMetrics().booleanValue()) {
            return new MicrometerMetricsCollector(meterRegistry, consumerConfig.getId());
        }
        return new NoMetricsCollector();
    }

    protected static BackoffStrategy getBackoffStrategy(ConsumerConfig consumerConfig) {
        if (consumerConfig.getBackoff().getEnabled().booleanValue()) {
            BackoffConfig c = consumerConfig.getBackoff();
            if (c.getJitter().getEnabled().booleanValue() && c.getJitter().getType().equals((Object)JitterType.EQUAL)) {
                return new EqualJitterBackoffStrategy((int)c.getInitialDelay().getUnit().toMillis(c.getInitialDelay().getAmount()), c.getBackoffFactor().doubleValue(), c.getMaxDelay().getUnit().toMillis(c.getMaxDelay().getAmount()), c.getMaxRetries().intValue());
            }
            if (c.getJitter().getEnabled().booleanValue() && c.getJitter().getType().equals((Object)JitterType.FULL)) {
                return new FullJitterBackoffStrategy((int)c.getInitialDelay().getUnit().toMillis(c.getInitialDelay().getAmount()), c.getBackoffFactor().doubleValue(), c.getMaxDelay().getUnit().toMillis(c.getMaxDelay().getAmount()), c.getMaxRetries().intValue());
            }
            return new ExponentialBackoffStrategy((int)c.getInitialDelay().getUnit().toMillis(c.getInitialDelay().getAmount()), c.getBackoffFactor().doubleValue(), c.getMaxDelay().getUnit().toMillis(c.getMaxDelay().getAmount()), c.getMaxRetries().intValue());
        }
        return new NoBackoffStrategy();
    }

    private Subscription getSubscription() throws IOException {
        LinkedList adminAttributes = new LinkedList();
        this.consumerConfig.getAuthorizations().getAdmins().forEach((key, value) -> adminAttributes.add(new Authorization.AuthorizationAttribute(key, value)));
        LinkedList<Authorization.AuthorizationAttribute> readerAttributes = new LinkedList<Authorization.AuthorizationAttribute>();
        this.consumerConfig.getAuthorizations().getReaders().forEach((key, value) -> readerAttributes.add(new Authorization.AuthorizationAttribute(key, value)));
        if (this.consumerConfig.getAuthorizations().getAnyReader().booleanValue()) {
            readerAttributes.add(Authorization.AuthorizationAttribute.ANYONE);
        }
        SubscriptionBuilder sb = this.nakadiClient.subscription(this.consumerConfig.getApplicationName(), (Set)Sets.newHashSet((Iterable)this.consumerConfig.getTopics())).withConsumerGroup(this.consumerConfig.getConsumerGroup()).withAuthorization(AuthorizationBuilder.authorization().withAdmins(adminAttributes).withReaders(readerAttributes).build());
        sb = Position.END.equals((Object)this.consumerConfig.getReadFrom()) ? sb.readFromEnd() : sb.readFromBegin();
        return sb.subscribe();
    }

    protected StreamParameters getStreamParameters() {
        if (this.consumerConfig.getStreamParameters() == null) {
            return new StreamParameters();
        }
        StreamParametersConfig config = this.consumerConfig.getStreamParameters();
        StreamParameters sp = new StreamParameters();
        if (config.getBatchFlushTimeout() != null) {
            sp = sp.withBatchFlushTimeout(config.getBatchFlushTimeout().intValue());
        }
        if (config.getBatchLimit() != null) {
            sp = sp.withBatchLimit(config.getBatchLimit().intValue());
        }
        if (config.getMaxUncommittedEvents() != null) {
            sp = sp.withMaxUncommittedEvents(config.getMaxUncommittedEvents().intValue());
        }
        if (config.getStreamKeepAliveLimit() != null) {
            sp = sp.withStreamKeepAliveLimit(config.getStreamKeepAliveLimit().intValue());
        }
        if (config.getStreamLimit() != null) {
            sp = sp.withStreamLimit(config.getStreamLimit().intValue());
        }
        if (config.getStreamTimeout() != null) {
            sp = sp.withStreamTimeout(config.getStreamTimeout().intValue());
        }
        return sp;
    }

    public FahrscheinNakadiConsumer(@NonNull NakadiClient nakadiClient, @NonNull ConsumerConfig consumerConfig) {
        if (nakadiClient == null) {
            throw new NullPointerException("nakadiClient is marked non-null but is null");
        }
        if (consumerConfig == null) {
            throw new NullPointerException("consumerConfig is marked non-null but is null");
        }
        this.nakadiClient = nakadiClient;
        this.consumerConfig = consumerConfig;
    }
}

