/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import java.util.Objects;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.filtering.chain.FilterChain;
import pl.allegro.tech.hermes.consumers.consumer.filtering.chain.FilterChainFactory;
import pl.allegro.tech.hermes.consumers.consumer.filtering.chain.FilterResult;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;

public class FilteringMessageReceiver
implements MessageReceiver {
    private MessageReceiver receiver;
    private FilteredMessageHandler filteredMessageHandler;
    private FilterChainFactory filterChainFactory;
    private volatile FilterChain filterChain;
    private Subscription subscription;
    private boolean consuming = true;

    public FilteringMessageReceiver(MessageReceiver receiver, FilteredMessageHandler filteredMessageHandler, FilterChainFactory filterChainFactory, Subscription subscription) {
        this.receiver = receiver;
        this.filteredMessageHandler = filteredMessageHandler;
        this.filterChainFactory = filterChainFactory;
        this.subscription = subscription;
        this.filterChain = filterChainFactory.create(subscription);
    }

    @Override
    public Message next() {
        Message message;
        do {
            message = this.receiver.next();
        } while (this.consuming && this.filter(message));
        return message;
    }

    private boolean filter(Message message) {
        FilterResult result = this.filterChain.apply(message);
        this.filteredMessageHandler.handle(result, message, this.subscription);
        return result.isFiltered();
    }

    @Override
    public void stop() {
        this.consuming = false;
        this.receiver.stop();
    }

    @Override
    public void update(Subscription newSubscription) {
        if (!Objects.equals(this.subscription.getFilters(), newSubscription.getFilters())) {
            this.filterChain = this.filterChainFactory.create(newSubscription);
        }
        this.subscription = newSubscription;
        this.receiver.update(newSubscription);
    }
}

