/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.kafka.log;

import ch.admin.bit.jeap.messaging.kafka.log.MessageLogger;
import ch.admin.bit.jeap.messaging.kafka.log.TopicLogger;
import ch.admin.bit.jeap.messaging.kafka.tracing.TracerBridge;
import java.util.Map;
import lombok.Generated;
import net.logstash.logback.argument.StructuredArguments;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

public class ConsumerLoggingInterceptor
implements ConsumerInterceptor<Object, Object> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConsumerLoggingInterceptor.class);
    public static final String TRACER_BRIDGE = "consumerlogginginterceptor.tracerbridge";
    public static final String CLUSTER_NAME_CONFIG = "consumerlogginginterceptor.clustername";
    private TracerBridge tracerBridge;
    private String clusterName;

    public void configure(Map<String, ?> configs) {
        String clusterName;
        if (configs.containsKey(TRACER_BRIDGE)) {
            this.tracerBridge = (TracerBridge)configs.get(TRACER_BRIDGE);
        }
        if (!StringUtils.hasText((String)(clusterName = (String)configs.get(CLUSTER_NAME_CONFIG)))) {
            throw new IllegalStateException("Mandatory config property %s is missing".formatted(CLUSTER_NAME_CONFIG));
        }
        this.clusterName = clusterName;
    }

    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        records.forEach(this::onConsume);
        return records;
    }

    private void onConsume(ConsumerRecord<Object, Object> record) {
        this.backupTracingHeaders(record);
        try (TracerBridge.TracerBridgeElement span = this.getSpan(record);){
            if (log.isInfoEnabled()) {
                log.info("Received {} from {} with offset {} on cluster {}", new Object[]{MessageLogger.message(record.value()), TopicLogger.topic(record), record.offset(), this.clusterName});
            }
        }
    }

    private void backupTracingHeaders(ConsumerRecord<Object, Object> record) {
        if (this.tracerBridge != null) {
            this.tracerBridge.backupOriginalTraceContext(record);
        }
    }

    private TracerBridge.TracerBridgeElement getSpan(ConsumerRecord<?, ?> record) {
        if (this.tracerBridge != null) {
            return this.tracerBridge.getSpan(record);
        }
        return () -> {};
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach(this::onCommit);
    }

    private void onCommit(TopicPartition topic, OffsetAndMetadata offset) {
        if (offset.metadata() == null) {
            log.debug("Commit offset {} on {}", (Object)StructuredArguments.value((String)"offset", (Object)offset.offset()), (Object)TopicLogger.topic(topic));
        } else {
            log.debug("Commit offset {} on {} with {}", new Object[]{StructuredArguments.value((String)"offset", (Object)offset.offset()), TopicLogger.topic(topic), StructuredArguments.value((String)"metadata", (Object)offset.metadata())});
        }
    }

    public void close() {
    }
}

