/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.nifi.service;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.prometheus.client.exporter.common.TextFormat;
import jakarta.servlet.Servlet;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.qubership.nifi.service.recordSink.MetricCompositeKey;

@Tags(value={"record", "send", "write", "prometheus"})
public class QubershipPrometheusRecordSink
extends AbstractControllerService
implements RecordSinkService {
    private Server prometheusServer;
    public PrometheusMeterRegistry meterRegistry;
    private static final List<PropertyDescriptor> PROPERTIES;
    private int metricsEndpointPort;
    private boolean clearMetrics;
    protected String namespace;
    protected String hostname;
    private String instance;
    private Map<MetricCompositeKey, Number> metricSet = new ConcurrentHashMap<MetricCompositeKey, Number>();
    public static final PropertyDescriptor METRICS_ENDPOINT_PORT;
    public static final PropertyDescriptor INSTANCE_ID;
    public static final PropertyDescriptor CLEAR_METRICS;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    protected void startServer() {
        try {
            this.prometheusServer = new Server(this.metricsEndpointPort);
            ServletContextHandler servletContextHandler = new ServletContextHandler();
            servletContextHandler.setContextPath("/");
            servletContextHandler.addServlet(new ServletHolder((Servlet)new PrometheusServlet()), "/metrics");
            this.prometheusServer.setHandler((Handler)servletContextHandler);
            this.prometheusServer.start();
        }
        catch (Exception e) {
            this.getLogger().error("Error while starting Jetty server {}", (Throwable)e);
            throw new ProcessException("Error while starting Jetty server {}", (Throwable)e);
        }
    }

    protected String getNamespace() {
        return System.getenv("NAMESPACE");
    }

    protected String getHostname() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException ex) {
            this.getLogger().warn("Error while getting host name {}", new Object[]{new Object[]{ex.getLocalizedMessage()}, ex});
            return "cloud-data-migration-nifi";
        }
    }

    @OnEnabled
    public void onScheduled(ConfigurationContext context) {
        this.meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
        this.metricsEndpointPort = context.getProperty(METRICS_ENDPOINT_PORT).asInteger();
        this.clearMetrics = context.getProperty(CLEAR_METRICS).getValue().equals("Yes");
        this.namespace = this.getNamespace();
        this.hostname = this.getHostname();
        this.instance = this.namespace + "_" + this.hostname;
        this.startServer();
    }

    @OnDisabled
    public void onStopped() throws Exception {
        if (this.clearMetrics) {
            this.meterRegistry.clear();
            this.metricSet.clear();
        }
        if (this.prometheusServer != null) {
            this.getLogger().info("Jetty server is shutting down");
            this.prometheusServer.stop();
        }
    }

    @OnShutdown
    public void onShutDown() throws Exception {
        if (this.prometheusServer != null) {
            this.getLogger().info("Jetty server is shutting down");
            this.prometheusServer.stop();
        }
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> map, boolean b) throws IOException {
        Record r;
        String[] staticLabelNames = new String[]{"namespace", "hostname", "instance"};
        WriteResult writeResult = null;
        RecordSchema recordSchema = recordSet.getSchema();
        String[] labelNames = (String[])Stream.concat(recordSchema.getFields().stream().filter(f -> this.isLabel(f.getDataType().getFieldType())).map(RecordField::getFieldName).sorted(), Arrays.stream(staticLabelNames)).toArray(String[]::new);
        int recordCount = 0;
        while ((r = recordSet.next()) != null) {
            Record record = r;
            String[] labelValues = (String[])Arrays.stream(labelNames).map(label -> {
                String value = record.getAsString(label);
                if (value != null) {
                    return value;
                }
                if ("namespace".equals(label)) {
                    return this.namespace;
                }
                if ("hostname".equals(label)) {
                    return this.hostname;
                }
                if ("instance".equals(label)) {
                    return this.instance;
                }
                return "";
            }).toArray(String[]::new);
            recordSchema.getFields().stream().filter(field -> this.isNumeric(field.getDataType().getFieldType())).forEach(recordField -> {
                MetricCompositeKey metricCompositeKey = new MetricCompositeKey(recordField.getFieldName(), labelNames, labelValues);
                Number num = this.convertNum(record, recordField.getFieldName());
                if (num != null) {
                    this.metricSet.put(metricCompositeKey, this.convertNum(record, recordField.getFieldName()));
                    Optional dataType = record.getSchema().getDataType(metricCompositeKey.getMetricName());
                    if (dataType.isPresent()) {
                        Gauge.builder((String)metricCompositeKey.getMetricName(), () -> {
                            Number value = this.getMetricValue(metricCompositeKey);
                            return value == null ? (Number)0 : (Number)value;
                        }).tags((Iterable)IntStream.range(0, labelValues.length).filter(i -> labelValues[i] != null).mapToObj(i -> Tag.of((String)labelNames[i], (String)labelValues[i])).collect(Collectors.toList())).register((MeterRegistry)this.meterRegistry);
                    }
                }
            });
            recordSchema.getFields().stream().filter(recordField -> this.isRecord((RecordField)recordField)).forEach(recordField -> {
                Optional dataType;
                String metricName = recordField.getFieldName();
                Record metricRecord = (Record)record.getValue(recordField);
                Number value = this.convertNum(metricRecord, "value");
                String type = metricRecord.getAsString("type");
                if (value != null && (dataType = record.getSchema().getDataType(metricName)).isPresent()) {
                    List<Tag> tagsList = IntStream.range(0, labelValues.length).filter(i -> labelValues[i] != null).mapToObj(i -> Tag.of((String)labelNames[i], (String)labelValues[i])).collect(Collectors.toList());
                    if ("Counter".equals(type)) {
                        Counter counter = this.createCounter(metricName, tagsList);
                        counter.increment(value.doubleValue());
                    } else if ("Summary".equals(type)) {
                        DistributionSummary distributionSummary = this.createSummary(metricName, metricRecord, tagsList);
                        distributionSummary.record(value.doubleValue());
                    }
                }
            });
            ++recordCount;
        }
        map.put("record.count", Integer.toString(recordCount));
        writeResult = WriteResult.of((int)recordCount, map);
        return writeResult;
    }

    private synchronized Counter createCounter(String metricName, List<Tag> tagsList) {
        Counter counter = Counter.builder((String)metricName).tags(tagsList).register((MeterRegistry)this.meterRegistry);
        return counter;
    }

    private synchronized DistributionSummary createSummary(String metricName, Record metricRecord, List<Tag> tagsList) {
        Duration statisticExpiry = null;
        double[] publishPercentiles = null;
        Object[] publishPercentilesObject = metricRecord.getAsArray("publishPercentiles");
        if (publishPercentilesObject != null) {
            publishPercentiles = new double[publishPercentilesObject.length];
            for (int i = 0; i < publishPercentilesObject.length; ++i) {
                if (!(publishPercentilesObject[i] instanceof Number)) continue;
                publishPercentiles[i] = ((Number)publishPercentilesObject[i]).doubleValue();
            }
        }
        if (StringUtils.isNotEmpty((CharSequence)metricRecord.getAsString("statisticExpiry"))) {
            statisticExpiry = Duration.parse(metricRecord.getAsString("statisticExpiry"));
        }
        DistributionSummary distributionSummary = DistributionSummary.builder((String)metricName).tags(tagsList).distributionStatisticBufferLength(metricRecord.getAsInt("statisticBufferLength")).distributionStatisticExpiry(statisticExpiry).publishPercentiles(publishPercentiles).register((MeterRegistry)this.meterRegistry);
        return distributionSummary;
    }

    private Number getMetricValue(MetricCompositeKey metricCompositeKey) {
        return this.metricSet.get(metricCompositeKey);
    }

    private Number convertNum(Record record, String name) {
        Number num = null;
        switch (((DataType)record.getSchema().getDataType(name).get()).getFieldType()) {
            case INT: {
                num = record.getAsInt(name);
                break;
            }
            case LONG: {
                num = record.getAsLong(name);
                break;
            }
            case FLOAT: {
                num = record.getAsFloat(name);
                break;
            }
            case DOUBLE: {
                num = record.getAsDouble(name);
                break;
            }
            case DECIMAL: {
                if (((DataType)record.getSchema().getDataType(name).get()).getFieldType() == RecordFieldType.DOUBLE) {
                    num = record.getAsDouble(name);
                    break;
                }
                num = record.getAsFloat(name);
                break;
            }
            case CHOICE: {
                num = NumberUtils.createNumber((String)record.getAsString(name));
                break;
            }
            case BOOLEAN: {
                num = record.getAsBoolean(name) != false ? Integer.valueOf(1) : Integer.valueOf(0);
            }
        }
        return num;
    }

    private boolean isLabel(RecordFieldType dataType) {
        return RecordFieldType.STRING.equals((Object)dataType) || RecordFieldType.CHAR.equals((Object)dataType);
    }

    private boolean isNumeric(RecordFieldType dataType) {
        return RecordFieldType.INT.equals((Object)dataType) || RecordFieldType.SHORT.equals((Object)dataType) || RecordFieldType.LONG.equals((Object)dataType) || RecordFieldType.BIGINT.equals((Object)dataType) || RecordFieldType.FLOAT.equals((Object)dataType) || RecordFieldType.DOUBLE.equals((Object)dataType) || RecordFieldType.DECIMAL.equals((Object)dataType) || RecordFieldType.BOOLEAN.equals((Object)dataType) || RecordFieldType.CHOICE.equals((Object)dataType);
    }

    private boolean isRecord(RecordField field) {
        if (RecordFieldType.RECORD.equals((Object)field.getDataType().getFieldType())) {
            RecordSchema schema = ((RecordDataType)field.getDataType()).getChildSchema();
            return schema.getField("type").isPresent() && schema.getField("value").isPresent();
        }
        return false;
    }

    static {
        METRICS_ENDPOINT_PORT = new PropertyDescriptor.Builder().name("prometheus-sink-metrics-endpoint-port").displayName("Prometheus Metrics Endpoint Port").description("The Port where prometheus metrics can be accessed").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("9092").addValidator(StandardValidators.INTEGER_VALIDATOR).build();
        INSTANCE_ID = new PropertyDescriptor.Builder().name("prometheus-sink-instance-id").displayName("Instance ID").description("Id of this NiFi instance to be included in the metrics sent to Prometheus").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("${hostname(true)}_${NAMESPACE}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        CLEAR_METRICS = new PropertyDescriptor.Builder().name("prometheus-sink-clear-metrics").displayName("Clear Metrics on Disable").description("If set to Yes, all metrics stored in the controller service are cleared, when the controller service is disabled. By default, metrics are not cleared.").defaultValue("No").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"Yes", "No"}).required(true).build();
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(METRICS_ENDPOINT_PORT);
        props.add(INSTANCE_ID);
        props.add(CLEAR_METRICS);
        PROPERTIES = Collections.unmodifiableList(props);
    }

    private final class PrometheusServlet
    extends HttpServlet {
        private PrometheusServlet() {
        }

        protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
            resp.setStatus(200);
            resp.setContentType("text/plain; version=0.0.4; charset=utf-8");
            try (PrintWriter writer = resp.getWriter();){
                TextFormat.write004((Writer)writer, (Enumeration)QubershipPrometheusRecordSink.this.meterRegistry.getPrometheusRegistry().metricFamilySamples());
                ((Writer)writer).flush();
            }
            catch (IOException e) {
                QubershipPrometheusRecordSink.this.getLogger().error("Error while scraping metrics {}", (Throwable)e);
                throw new ProcessException("Error while scraping metrics {}", (Throwable)e);
            }
        }
    }
}

