/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorErrorCode;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventProducer
implements Runnable {
    private volatile List<String> sendBuffer = new ArrayList<String>();
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventProducer.class);
    private static final String defaultThreadName = "KafkaProducer for topic ";
    private volatile boolean running = true;
    private OMRSAuditLog auditLog;
    private String listenerThreadName;
    private String topicName;
    private int sleepTime = 1000;
    private static final long recoverySleepTimeSec = 10L;
    private String localServerId;
    private Properties producerProperties;
    private Producer<String, String> producer;
    private KafkaOpenMetadataTopicConnector connector;
    private long messageSendCount = 0L;

    KafkaOpenMetadataEventProducer(String topicName, String localServerId, Properties producerProperties, KafkaOpenMetadataTopicConnector connector, OMRSAuditLog auditLog) {
        this.auditLog = auditLog;
        this.topicName = topicName;
        this.localServerId = localServerId;
        this.connector = connector;
        this.producerProperties = producerProperties;
        this.listenerThreadName = defaultThreadName + topicName;
        String actionDescription = "new producer";
        KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES;
        auditLog.logRecord("new producer", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(Integer.toString(producerProperties.size()), topicName), producerProperties.toString(), auditCode.getSystemAction(), auditCode.getUserAction());
    }

    private void publishEvent(String event) throws ConnectorCheckedException {
        String methodName = "publishEvent";
        boolean eventSent = false;
        long eventRetryCount = 0L;
        while (!eventSent) {
            try {
                log.debug("Sending message {0}" + event);
                ProducerRecord record = new ProducerRecord(this.topicName, (Object)this.localServerId, (Object)event);
                this.producer.send(record).get();
                eventSent = true;
                ++this.messageSendCount;
            }
            catch (ExecutionException error) {
                log.debug("Kafka had trouble sending event: " + event + "exception message is " + error.getMessage());
                if (eventRetryCount == 10L) {
                    eventRetryCount = 0L;
                    continue;
                }
                if (eventRetryCount == 0L) {
                    KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.EVENT_SEND_IN_ERROR_LOOP;
                    this.auditLog.logRecord("publishEvent", auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicName, Long.toString(this.messageSendCount), Long.toString(this.getSendBufferSize()), error.getMessage()), null, auditCode.getSystemAction(), auditCode.getUserAction());
                }
                ++eventRetryCount;
            }
            catch (WakeupException error) {
                log.error("Wake up for shut down " + error.toString());
            }
            catch (Throwable error) {
                log.error("Exception in sendEvent " + error.toString());
                KafkaOpenMetadataTopicConnectorErrorCode errorCode = KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT;
                String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(error.getClass().getName(), this.topicName, error.getMessage());
                throw new ConnectorCheckedException(errorCode.getHTTPErrorCode(), this.getClass().getName(), "publishEvent", errorMessage, errorCode.getSystemAction(), errorCode.getUserAction(), error);
            }
            finally {
                this.producer.flush();
            }
        }
    }

    @Override
    public void run() {
        String actionDescription = "run";
        KafkaOpenMetadataTopicConnectorAuditCode auditCode = KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_START;
        this.auditLog.logRecord(this.listenerThreadName, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicName, Integer.toString(this.sendBuffer.size())), this.producerProperties.toString(), auditCode.getSystemAction(), auditCode.getUserAction());
        this.producer = new KafkaProducer(this.producerProperties);
        while (this.isRunning()) {
            try {
                String bufferedEvent = this.getEvent();
                if (bufferedEvent == null) {
                    Thread.sleep(this.sleepTime);
                    continue;
                }
                while (bufferedEvent != null) {
                    this.publishEvent(bufferedEvent);
                    bufferedEvent = this.getEvent();
                }
            }
            catch (InterruptedException error) {
                log.info("Woken up from sleep " + error.getMessage());
            }
            catch (Throwable error) {
                log.error("Bad exception from sending events " + error.getMessage());
                this.recoverAfterError();
            }
        }
        this.producer.close();
        this.producer = null;
        auditCode = KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_SHUTDOWN;
        this.auditLog.logRecord(this.listenerThreadName, auditCode.getLogMessageId(), auditCode.getSeverity(), auditCode.getFormattedLogMessage(this.topicName, Integer.toString(this.getSendBufferSize()), Long.toString(this.messageSendCount)), this.producerProperties.toString(), auditCode.getSystemAction(), auditCode.getUserAction());
    }

    private synchronized void putEvent(String newEvent) {
        this.sendBuffer.add(newEvent);
    }

    private synchronized int getSendBufferSize() {
        return this.sendBuffer.size();
    }

    private synchronized String getEvent() {
        if (this.sendBuffer.isEmpty()) {
            return null;
        }
        return this.sendBuffer.remove(0);
    }

    public void sendEvent(String event) {
        this.putEvent(event);
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", 10L));
        try {
            Thread.sleep(10000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    public void safeCloseProducer() {
        this.stopRunning();
    }

    private synchronized boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }
}

