/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.kafka.contract;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.kafka.contract.ContractsValidator;
import ch.admin.bit.jeap.messaging.kafka.contract.NoContractException;
import ch.admin.bit.jeap.messaging.model.MessageType;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerContractInterceptor
implements ConsumerInterceptor<Object, Object> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConsumerContractInterceptor.class);
    public static final String CONTRACTS_VALIDATOR = "consumerContractInterceptor.contractsValidator";
    public static final String ALLOW_NO_CONTRACT_EVENTS = "consumerContractInterceptor.allowNoContractEvents";
    public static final String SILENT_IGNORE_NO_CONTRACT_EVENTS = "consumerContractInterceptor.silentIgnoreWithoutContract";
    public static final String EXEMPT_FROM_CONSUMER_CONTRACT_CHECK = "consumerContractInterceptor.exemptFromConsumerContractCheck";
    private ContractsValidator contractsValidator;
    private boolean allowNoContractEvents;
    private boolean silentIgnoreWithoutContract;

    public void configure(Map<String, ?> configs) {
        this.contractsValidator = (ContractsValidator)configs.get(CONTRACTS_VALIDATOR);
        this.allowNoContractEvents = (Boolean)configs.get(ALLOW_NO_CONTRACT_EVENTS);
        this.silentIgnoreWithoutContract = (Boolean)configs.get(SILENT_IGNORE_NO_CONTRACT_EVENTS);
        boolean exemptFromConsumerContractCheck = ConsumerContractInterceptor.isExemptFromConsumerContractCheck(configs);
        if (exemptFromConsumerContractCheck) {
            this.silentIgnoreWithoutContract = true;
            this.allowNoContractEvents = true;
        }
    }

    private static boolean isExemptFromConsumerContractCheck(Map<String, ?> configs) {
        Object exemptPropertyValue = configs.get(EXEMPT_FROM_CONSUMER_CONTRACT_CHECK);
        return exemptPropertyValue != null && "true".equals(exemptPropertyValue.toString());
    }

    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        if (this.silentIgnoreWithoutContract) {
            return records;
        }
        Map filtered = records.partitions().stream().collect(Collectors.toMap(Function.identity(), partition -> this.onConsumeTopicPartition(records.records(partition))));
        return new ConsumerRecords(filtered);
    }

    private List<ConsumerRecord<Object, Object>> onConsumeTopicPartition(List<ConsumerRecord<Object, Object>> input) {
        return input.stream().filter(this::isConsummationAllowed).toList();
    }

    private boolean isConsummationAllowed(ConsumerRecord<Object, Object> record) {
        Object object = record.value();
        if (!(object instanceof AvroMessage)) {
            return true;
        }
        AvroMessage avroMessage = (AvroMessage)object;
        MessageType type = avroMessage.getType();
        String topic = record.topic();
        try {
            this.contractsValidator.ensureConsumerContract(type, topic);
            return true;
        }
        catch (NoContractException e) {
            if (this.allowNoContractEvents) {
                String message = String.format("You have no contract to consume events of type %s from topic %s. However as consumeWithoutContractAllowed is ON this event will still be consumed. If you do not want to see this message set silentIgnoreWithoutContract to true.", type, topic);
                log.warn(message);
            } else {
                String message = String.format("You have no contract to consume events of type %s from topic %s. This event is filtered out and will not be consumed by this application. Use consumeWithoutContractAllowed to change this behavior in a dev environment. If you do not want to see this message set silentIgnoreWithoutContract to true.", type, topic);
                log.warn(message, (Throwable)e);
            }
            return this.allowNoContractEvents;
        }
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    public void close() {
    }
}

