/*
 * Decompiled with CFR 0.152.
 */
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnector;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorAuditCode;
import org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicConnectorErrorCode;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOpenMetadataEventProducer
implements Runnable {
    private final List<String> sendBuffer = Collections.synchronizedList(new ArrayList());
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventProducer.class);
    private static final String defaultThreadName = "KafkaProducer for topic ";
    private volatile boolean running = true;
    private final AuditLog auditLog;
    private final String listenerThreadName;
    private final String topicName;
    private final int sleepTime = 1000;
    private static final long recoverySleepTimeSec = 10L;
    private final String localServerId;
    private final Properties producerProperties;
    private Producer<String, String> producer = null;
    private KafkaOpenMetadataTopicConnector connector;
    private long messageSendCount = 0L;

    KafkaOpenMetadataEventProducer(String topicName, String localServerId, Properties producerProperties, KafkaOpenMetadataTopicConnector connector, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.topicName = topicName;
        this.localServerId = localServerId;
        this.connector = connector;
        this.producerProperties = producerProperties;
        this.listenerThreadName = defaultThreadName + topicName;
        String actionDescription = "new producer";
        auditLog.logMessage("new producer", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES.getMessageDefinition(Integer.toString(producerProperties.size()), topicName), producerProperties.toString());
    }

    private void publishEvent(String event) throws ConnectorCheckedException {
        String methodName = "publishEvent";
        boolean eventSent = false;
        long eventRetryCount = 0L;
        if (this.producer == null) {
            log.debug("Creating Producer");
            this.producer = new KafkaProducer(this.producerProperties);
        }
        while (!eventSent) {
            try {
                log.debug("Sending message {0}" + event);
                ProducerRecord record = new ProducerRecord(this.topicName, (Object)this.localServerId, (Object)event);
                this.producer.send(record).get();
                eventSent = true;
                ++this.messageSendCount;
            }
            catch (ExecutionException error) {
                log.debug("Kafka had trouble sending event: " + event + "exception message is " + error.getMessage());
                if (!this.isExceptionRetryable(error)) {
                    this.producer.close();
                    this.producer = null;
                    throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(error.getClass().getName(), this.topicName, error.getMessage()), this.getClass().getName(), "publishEvent", (Throwable)error);
                }
                if (eventRetryCount == 10L) {
                    this.producer.close();
                    this.producer = null;
                    log.error("Retryable Exception closed producer ");
                    break;
                }
                if (eventRetryCount == 0L) {
                    this.auditLog.logMessage("publishEvent", KafkaOpenMetadataTopicConnectorAuditCode.EVENT_SEND_IN_ERROR_LOOP.getMessageDefinition(this.topicName, Long.toString(this.messageSendCount), Long.toString(this.getSendBufferSize()), error.getMessage()));
                }
                ++eventRetryCount;
            }
            catch (WakeupException error) {
                log.error("Wake up for shut down " + error.toString());
            }
            catch (Throwable error) {
                this.producer.close();
                this.producer = null;
                log.debug("Send Events Throwable catch block closed producer");
                log.error("Exception in sendEvent " + error.toString());
                throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(error.getClass().getName(), this.topicName, error.getMessage()), this.getClass().getName(), "publishEvent", error);
            }
        }
    }

    @Override
    public void run() {
        String actionDescription = this.listenerThreadName + ":run";
        this.auditLog.logMessage(actionDescription, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_START.getMessageDefinition(this.topicName, String.valueOf(this.sendBuffer.size())), this.producerProperties.toString());
        while (this.isRunning()) {
            try {
                String bufferedEvent = this.getEvent();
                if (bufferedEvent == null) {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                    continue;
                }
                while (bufferedEvent != null) {
                    this.publishEvent(bufferedEvent);
                    bufferedEvent = this.getEvent();
                }
            }
            catch (InterruptedException error) {
                log.info("Woken up from sleep " + error.getMessage());
            }
            catch (Throwable error) {
                log.error("Bad exception from sending events " + error.getMessage());
                if (!this.isExceptionRetryable(error)) break;
                this.recoverAfterError();
            }
        }
        if (this.producer != null) {
            log.debug("");
            this.producer.close();
            this.producer = null;
        }
        this.auditLog.logMessage(actionDescription, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_SHUTDOWN.getMessageDefinition(this.topicName, Integer.toString(this.getSendBufferSize()), Long.toString(this.messageSendCount)), this.producerProperties.toString());
    }

    private void putEvent(String newEvent) {
        this.sendBuffer.add(newEvent);
    }

    private int getSendBufferSize() {
        return this.sendBuffer.size();
    }

    private String getEvent() {
        if (this.sendBuffer.isEmpty()) {
            return null;
        }
        return this.sendBuffer.remove(0);
    }

    public void sendEvent(String event) {
        this.putEvent(event);
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", 10L));
        try {
            Thread.sleep(10000L);
        }
        catch (InterruptedException e1) {
            log.debug("Interrupted while recovering", (Throwable)e1);
        }
    }

    public void safeCloseProducer() {
        this.stopRunning();
    }

    private boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }

    private boolean isExceptionRetryable(Throwable throwable) {
        Throwable nested = null;
        while ((nested = throwable.getCause()) != null) {
            if (nested instanceof RetriableException) {
                return true;
            }
            throwable = throwable.getCause();
        }
        return false;
    }
}

