/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.messaging.kafka.contract;

import ch.admin.bit.jeap.messaging.avro.AvroMessage;
import ch.admin.bit.jeap.messaging.kafka.contract.ContractsValidator;
import ch.admin.bit.jeap.messaging.kafka.contract.NoContractException;
import ch.admin.bit.jeap.messaging.kafka.contract.NoContractProducerRecord;
import ch.admin.bit.jeap.messaging.model.MessageType;
import java.util.Map;
import lombok.Generated;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerContractInterceptor
implements ProducerInterceptor<Object, Object> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProducerContractInterceptor.class);
    public static final String CONTRACTS_VALIDATOR = "producerContractInterceptor.contractsValidator";
    public static final String ALLOW_NO_CONTRACT_EVENTS = "producerContractInterceptor.allowNoContractEvents";
    public static final String ALLOW_NO_CONTRACT_EVENTS_SILENT = "producerContractInterceptor.allowNoContractEventsSilent";
    public static final String EXEMPT_FROM_PRODUCER_CONTRACT_CHECK_HEADER = "exemptFromProducerContractCheckMarker";
    private ContractsValidator contractsValidator;
    private boolean allowNoContractEvents;
    private boolean allowNoContractEventsSilent;

    public void configure(Map<String, ?> configs) {
        this.contractsValidator = (ContractsValidator)configs.get(CONTRACTS_VALIDATOR);
        this.allowNoContractEvents = (Boolean)configs.get(ALLOW_NO_CONTRACT_EVENTS);
        this.allowNoContractEventsSilent = (Boolean)configs.get(ALLOW_NO_CONTRACT_EVENTS_SILENT);
    }

    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
        if (this.allowNoContractEventsSilent) {
            return record;
        }
        if (ProducerContractInterceptor.isExemptFromProducerContractCheck(record)) {
            return record;
        }
        Object object = record.value();
        if (!(object instanceof AvroMessage)) {
            return record;
        }
        AvroMessage avroMessage = (AvroMessage)object;
        MessageType type = avroMessage.getType();
        String topic = record.topic();
        try {
            this.contractsValidator.ensurePublisherContract(type, topic);
            return record;
        }
        catch (NoContractException e) {
            if (this.allowNoContractEvents) {
                String message = String.format("You have no contract to publish events of type %s on topic %s but still do so. However as publishWithoutContractAllowed is ON this event will still be published", type, topic);
                log.warn(message);
                return record;
            }
            String message = String.format("You have no contract to publish events of type %s on topic %s but still do so. This event is NOT published and an exception is send to the application. Use publishWithoutContractAllowed to change this behavior in a dev environment", type, topic);
            log.error(message, (Throwable)e);
            return new NoContractProducerRecord<Object, Object>(e);
        }
    }

    private static boolean isExemptFromProducerContractCheck(ProducerRecord<Object, Object> producerRecord) {
        Iterable exemptHeaders = producerRecord.headers().headers(EXEMPT_FROM_PRODUCER_CONTRACT_CHECK_HEADER);
        return exemptHeaders.iterator().hasNext();
    }

    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    public void close() {
    }
}

