/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.processcontext.adapter.kafka.message.consumer;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.avro.AvroMessageKey;
import ch.admin.bit.jeap.messaging.model.Message;
import ch.admin.bit.jeap.processcontext.adapter.kafka.message.consumer.KafkaEventListenerException;
import ch.admin.bit.jeap.processcontext.domain.message.MessageReceiver;
import ch.admin.bit.jeap.processcontext.plugin.api.message.MessageFilter;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

class KafkaMessageListener
implements AcknowledgingMessageListener<AvroMessageKey, AvroMessage> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageListener.class);
    private final String eventName;
    private final MessageReceiver messageReceiver;
    private final MessageFilter<AvroMessage> messageFilter;

    public void onMessage(ConsumerRecord<AvroMessageKey, AvroMessage> record, Acknowledgment acknowledgment) {
        try {
            this.handleMessage((AvroMessage)record.value(), record.topic());
        }
        catch (Exception ex) {
            throw KafkaEventListenerException.from(ex);
        }
        acknowledgment.acknowledge();
    }

    private void handleMessage(AvroMessage message, String topic) {
        String receivedEventName = message.getType().getName();
        if (!this.eventName.equals(receivedEventName)) {
            this.logIgnoredMessage(receivedEventName, topic);
            return;
        }
        if (this.messageFilter == null || this.messageFilter.filter((Message)message)) {
            this.messageReceiver.messageReceived((Message)message);
        } else {
            log.trace("Message '{}' filtered out by the configured message filter and will be ignored", (Object)receivedEventName);
        }
    }

    private void logIgnoredMessage(String eventName, String topic) {
        log.debug("Ignoring event '{}' on topic '{}' as there is no matching message reference in the template/correlation provider", (Object)eventName, (Object)topic);
    }

    @Generated
    public KafkaMessageListener(String eventName, MessageReceiver messageReceiver, MessageFilter<AvroMessage> messageFilter) {
        this.eventName = eventName;
        this.messageReceiver = messageReceiver;
        this.messageFilter = messageFilter;
    }
}

