/*
 * Decompiled with CFR 0.152.
 */
package org.imixs.workflow.kafka;

import jakarta.annotation.PostConstruct;
import jakarta.ejb.ConcurrencyManagement;
import jakarta.ejb.ConcurrencyManagementType;
import jakarta.ejb.Singleton;
import jakarta.enterprise.event.Observes;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.imixs.workflow.ItemCollection;
import org.imixs.workflow.engine.ProcessingEvent;
import org.imixs.workflow.exceptions.AdapterException;
import org.imixs.workflow.exceptions.ProcessingErrorException;
import org.imixs.workflow.kafka.ConfigService;
import org.imixs.workflow.xml.XMLDocumentAdapter;

@Singleton
@ConcurrencyManagement(value=ConcurrencyManagementType.BEAN)
public class ProducerService
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static Logger logger = Logger.getLogger(ProducerService.class.getName());
    Producer<Long, String> producer;

    @PostConstruct
    void init() {
        logger.info("...init KafkaProducer...");
        Properties props = new Properties();
        props.put("bootstrap.servers", ConfigService.getEnv("KAFKA_BROKERS", "kafka:9092"));
        props.put("client.id", ConfigService.getEnv("KAFKA_CLIENTID", "Imixs-Workflow-1"));
        props.put("key.serializer", LongSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer(props);
    }

    public void onProcess(@Observes ProcessingEvent documentEvent) {
        if (2 == documentEvent.getEventType()) {
            String modelPattern = ConfigService.getEnv("KAFKA_AUTOWIRE", null);
            if (modelPattern != null && !modelPattern.isEmpty()) {
                String modelVersion = documentEvent.getDocument().getModelVersion();
                Pattern regexPattern = Pattern.compile(modelPattern);
                if (!regexPattern.matcher(modelVersion).find()) {
                    return;
                }
            }
            logger.info("...consuming ProcssingEvent (model:" + modelPattern + ") -> send new kafka event...");
            try {
                this.sendWorkitem(documentEvent.getDocument());
            }
            catch (AdapterException e) {
                throw new ProcessingErrorException(e.getErrorContext(), e.getErrorCode(), e.getMessage(), (Exception)((Object)e));
            }
        }
    }

    public void sendWorkitem(ItemCollection workitem) throws AdapterException {
        String uid = workitem.getUniqueID();
        String topic = workitem.getModelVersion();
        try {
            byte[] value = XMLDocumentAdapter.writeItemCollection((ItemCollection)workitem);
            ProducerRecord record = new ProducerRecord(topic, (Object)new String(value));
            RecordMetadata metadata = (RecordMetadata)this.producer.send(record).get();
            logger.info("...Imixs-Workflow Event sent with key " + uid + " to partition " + metadata.partition() + " with offset " + metadata.offset());
        }
        catch (ExecutionException e) {
            throw new AdapterException(ProducerService.class.getSimpleName(), "EXECUTION-EXCEPTION", e.getMessage(), (Exception)e);
        }
        catch (InterruptedException e) {
            throw new AdapterException(ProducerService.class.getSimpleName(), "INTERUPTED-EXCEPTION", e.getMessage(), (Exception)e);
        }
        catch (JAXBException e) {
            throw new AdapterException(ProducerService.class.getSimpleName(), "JAXB-EXCEPTION", e.getMessage(), (Exception)((Object)e));
        }
        catch (IOException e) {
            throw new AdapterException(ProducerService.class.getSimpleName(), "IO-EXCEPTION", e.getMessage(), (Exception)e);
        }
    }
}

