/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl.ce;

import io.smallrye.reactive.messaging.ce.DefaultCloudEventMetadataBuilder;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.impl.BaseCloudEventMetadata;
import io.smallrye.reactive.messaging.ce.impl.DefaultIncomingCloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaCloudEventMetadata;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.ce.DefaultIncomingKafkaCloudEventMetadata;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.buffer.Buffer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.microprofile.reactive.messaging.Message;

public class KafkaCloudEventHelper {
    public static final String KAFKA_HEADER_CONTENT_TYPE = "content-type";
    public static final String CE_CONTENT_TYPE_PREFIX = "application/cloudevents";
    public static final String CE_HEADER_PREFIX = "ce_";
    public static final String STRUCTURED_CONTENT_TYPE = "application/cloudevents+json; charset=UTF-8";
    public static final String KAFKA_HEADER_FOR_SPEC_VERSION = "ce_specversion";
    public static final String KAFKA_HEADER_FOR_TYPE = "ce_type";
    public static final String KAFKA_HEADER_FOR_SOURCE = "ce_source";
    public static final String KAFKA_HEADER_FOR_ID = "ce_id";
    public static final String KAFKA_HEADER_FOR_SCHEMA = "ce_dataschema";
    public static final String KAFKA_HEADER_FOR_CONTENT_TYPE = "ce_datacontenttype";
    public static final String KAFKA_HEADER_FOR_SUBJECT = "ce_subject";
    public static final String KAFKA_HEADER_FOR_TIME = "ce_time";
    public static final DateTimeFormatter RFC3339_DATE_FORMAT = new DateTimeFormatterBuilder().appendPattern("yyyy-MM-dd'T'HH:mm:ss").appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).appendZoneOrOffsetId().toFormatter();

    private KafkaCloudEventHelper() {
    }

    public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromStructuredCloudEvent(ConsumerRecord<K, T> record) {
        String time;
        String subject;
        String schema;
        JsonObject content;
        DefaultCloudEventMetadataBuilder<Object> builder = new DefaultCloudEventMetadataBuilder<Object>();
        if (record.value() instanceof JsonObject) {
            content = (JsonObject)record.value();
        } else if (record.value() instanceof String) {
            content = new JsonObject((String)record.value());
        } else if (record.value() instanceof byte[]) {
            byte[] bytes = (byte[])record.value();
            Buffer buffer = Buffer.buffer(bytes);
            content = buffer.toJsonObject();
        } else {
            throw new IllegalArgumentException("Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: " + record.value().getClass());
        }
        builder.withSpecVersion(content.getString("specversion"));
        builder.withId(content.getString("id"));
        String source = content.getString("source");
        if (source == null) {
            throw new IllegalArgumentException("The JSON value must contain the source attribute");
        }
        builder.withSource(URI.create(source));
        builder.withType(content.getString("type"));
        String ct = content.getString("datacontenttype");
        if (ct != null) {
            builder.withDataContentType(ct);
        }
        if ((schema = content.getString("dataschema")) != null) {
            builder.withDataSchema(URI.create(schema));
        }
        if ((subject = content.getString("subject")) != null) {
            builder.withSubject(subject);
        }
        if ((time = content.getString("time")) != null) {
            builder.withTimestamp(ZonedDateTime.parse(time, RFC3339_DATE_FORMAT));
        }
        if (record.key() != null) {
            builder.withExtension("partitionkey", record.key());
        }
        builder.withExtension("kafkatopic", record.topic());
        Object data = content.getValue("data");
        builder.withData(data);
        BaseCloudEventMetadata cloudEventMetadata = builder.build();
        cloudEventMetadata.validate();
        return new DefaultIncomingKafkaCloudEventMetadata(new DefaultIncomingCloudEventMetadata(cloudEventMetadata));
    }

    public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromBinaryCloudEvent(ConsumerRecord<?, T> record) {
        String time;
        String subject;
        String schema;
        DefaultCloudEventMetadataBuilder builder = new DefaultCloudEventMetadataBuilder();
        HashMap headers = new HashMap();
        record.headers().forEach(kh -> {
            String key = kh.key();
            String value = new String(kh.value(), StandardCharsets.UTF_8);
            headers.put(key, value);
        });
        builder.withSpecVersion((String)headers.remove(KAFKA_HEADER_FOR_SPEC_VERSION));
        builder.withId((String)headers.remove(KAFKA_HEADER_FOR_ID));
        String source = (String)headers.remove(KAFKA_HEADER_FOR_SOURCE);
        if (source == null) {
            throw new IllegalArgumentException("The Kafka record must contain the ce_source header");
        }
        builder.withSource(URI.create(source));
        builder.withType((String)headers.remove(KAFKA_HEADER_FOR_TYPE));
        String ct = (String)headers.remove(KAFKA_HEADER_CONTENT_TYPE);
        if (ct != null) {
            builder.withDataContentType(ct);
        }
        if ((schema = (String)headers.remove(KAFKA_HEADER_FOR_SCHEMA)) != null) {
            builder.withDataSchema(URI.create(schema));
        }
        if ((subject = (String)headers.remove(KAFKA_HEADER_FOR_SUBJECT)) != null) {
            builder.withSubject(subject);
        }
        if ((time = (String)headers.remove(KAFKA_HEADER_FOR_TIME)) != null) {
            ZonedDateTime parse = ZonedDateTime.parse(time, RFC3339_DATE_FORMAT);
            builder.withTimestamp(parse);
        }
        if (record.key() != null) {
            builder.withExtension("partitionkey", record.key());
        }
        builder.withExtension("kafkatopic", record.topic());
        headers.entrySet().stream().filter(entry -> ((String)entry.getKey()).startsWith(CE_HEADER_PREFIX)).forEach(entry -> {
            String key = ((String)entry.getKey()).substring(CE_HEADER_PREFIX.length());
            builder.withExtension(key, entry.getValue());
        });
        builder.withData(record.value());
        BaseCloudEventMetadata cloudEventMetadata = builder.build();
        return new DefaultIncomingKafkaCloudEventMetadata(new DefaultIncomingCloudEventMetadata(cloudEventMetadata));
    }

    public static ProducerRecord<?, ?> createBinaryRecord(Message<?> message, String topic, OutgoingKafkaRecordMetadata<?> metadata, OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (ceMetadata == null) {
            ceMetadata = OutgoingCloudEventMetadata.builder().build();
        }
        Integer partition = KafkaCloudEventHelper.getPartition(metadata, configuration);
        Object key = KafkaCloudEventHelper.getKey(message, metadata, ceMetadata, configuration);
        Long timestamp = KafkaCloudEventHelper.getTimestamp(metadata);
        List<Header> headers = KafkaCloudEventHelper.getHeaders(metadata);
        Optional<String> subject = KafkaCloudEventHelper.getSubject(ceMetadata, configuration);
        Optional<String> contentType = KafkaCloudEventHelper.getDataContentType(ceMetadata, configuration);
        Optional<URI> schema = KafkaCloudEventHelper.getDataSchema(ceMetadata, configuration);
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_SPEC_VERSION, ceMetadata.getSpecVersion().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_ID, ceMetadata.getId().getBytes(StandardCharsets.UTF_8)));
        String type = KafkaCloudEventHelper.getType(ceMetadata, configuration);
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_TYPE, type.getBytes(StandardCharsets.UTF_8)));
        String source = KafkaCloudEventHelper.getSource(ceMetadata, configuration);
        headers.add(new RecordHeader(KAFKA_HEADER_FOR_SOURCE, source.getBytes(StandardCharsets.UTF_8)));
        subject.ifPresent(s -> headers.add(new RecordHeader(KAFKA_HEADER_FOR_SUBJECT, s.getBytes(StandardCharsets.UTF_8))));
        contentType.ifPresent(s -> {
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_CONTENT_TYPE, s.getBytes(StandardCharsets.UTF_8)));
            headers.add(new RecordHeader(KAFKA_HEADER_CONTENT_TYPE, s.getBytes(StandardCharsets.UTF_8)));
        });
        schema.ifPresent(s -> headers.add(new RecordHeader(KAFKA_HEADER_FOR_SCHEMA, s.toString().getBytes(StandardCharsets.UTF_8))));
        Optional<ZonedDateTime> ts = ceMetadata.getTimeStamp();
        if (ts.isPresent()) {
            ZonedDateTime time = ts.get();
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(time).getBytes(StandardCharsets.UTF_8)));
        } else if (timestamp != null) {
            Instant instant = Instant.ofEpochMilli(timestamp);
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(instant).getBytes(StandardCharsets.UTF_8)));
        } else if (configuration.getCloudEventsInsertTimestamp().booleanValue()) {
            ZonedDateTime now = ZonedDateTime.now();
            headers.add(new RecordHeader(KAFKA_HEADER_FOR_TIME, RFC3339_DATE_FORMAT.format(now).getBytes(StandardCharsets.UTF_8)));
        }
        ceMetadata.getExtensions().forEach((k, v) -> {
            if (v != null) {
                headers.add(new RecordHeader(CE_HEADER_PREFIX + k, v.toString().getBytes(StandardCharsets.UTF_8)));
            }
        });
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record)payload).value();
        }
        return new ProducerRecord(topic, partition, timestamp, key, payload, headers);
    }

    private static String getSource(OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        String source;
        String string = source = ceMetadata.getSource() != null ? ceMetadata.getSource().toString() : null;
        if (source == null) {
            source = configuration.getCloudEventsSource().orElseThrow(() -> new IllegalArgumentException("Cannot build the Cloud Event Record - source is not set"));
        }
        return source;
    }

    private static String getType(OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        String type = ceMetadata.getType();
        if (type == null) {
            type = configuration.getCloudEventsType().orElseThrow(() -> new IllegalArgumentException("Cannot build the Cloud Event Record - type is not set"));
        }
        return type;
    }

    private static Optional<String> getSubject(OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (ceMetadata.getSubject().isPresent()) {
            return ceMetadata.getSubject();
        }
        return configuration.getCloudEventsSubject();
    }

    private static Optional<URI> getDataSchema(OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (ceMetadata.getDataSchema().isPresent()) {
            return ceMetadata.getDataSchema();
        }
        return configuration.getCloudEventsDataSchema().map(URI::create);
    }

    private static Optional<String> getDataContentType(OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (ceMetadata.getDataContentType().isPresent()) {
            return ceMetadata.getDataContentType();
        }
        return configuration.getCloudEventsDataContentType();
    }

    private static List<Header> getHeaders(OutgoingKafkaRecordMetadata<?> metadata) {
        ArrayList<Header> headers = new ArrayList<Header>();
        if (metadata != null && metadata.getHeaders() != null) {
            metadata.getHeaders().forEach(headers::add);
        }
        return headers;
    }

    private static Long getTimestamp(OutgoingKafkaRecordMetadata<?> metadata) {
        long timestamp = -1L;
        if (metadata != null && metadata.getTimestamp() != null) {
            timestamp = metadata.getTimestamp().toEpochMilli();
        }
        if (timestamp <= 0L) {
            return null;
        }
        return timestamp;
    }

    private static Object getKey(Message<?> message, OutgoingKafkaRecordMetadata<?> metadata, OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (metadata != null && metadata.getKey() != null) {
            return metadata.getKey();
        }
        if (message.getPayload() instanceof Record) {
            return ((Record)message.getPayload()).key();
        }
        return ceMetadata.getExtension("partitionkey").orElse(configuration.getKey().orElse(null));
    }

    private static Integer getPartition(OutgoingKafkaRecordMetadata<?> metadata, KafkaConnectorOutgoingConfiguration configuration) {
        int partition = configuration.getPartition();
        if (metadata != null && metadata.getPartition() != -1) {
            partition = metadata.getPartition();
        }
        if (partition < 0) {
            return null;
        }
        return partition;
    }

    public static ProducerRecord<?, ?> createStructuredRecord(Message<?> message, String topic, OutgoingKafkaRecordMetadata<?> metadata, OutgoingCloudEventMetadata<?> ceMetadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (ceMetadata == null) {
            ceMetadata = OutgoingCloudEventMetadata.builder().build();
        }
        Integer partition = KafkaCloudEventHelper.getPartition(metadata, configuration);
        Object key = KafkaCloudEventHelper.getKey(message, metadata, ceMetadata, configuration);
        Long timestamp = KafkaCloudEventHelper.getTimestamp(metadata);
        List<Header> headers = KafkaCloudEventHelper.getHeaders(metadata);
        String source = KafkaCloudEventHelper.getSource(ceMetadata, configuration);
        String type = KafkaCloudEventHelper.getType(ceMetadata, configuration);
        Optional<String> subject = KafkaCloudEventHelper.getSubject(ceMetadata, configuration);
        Optional<String> dataContentType = KafkaCloudEventHelper.getDataContentType(ceMetadata, configuration);
        Optional<URI> schema = KafkaCloudEventHelper.getDataSchema(ceMetadata, configuration);
        Optional<Header> contentType = headers.stream().filter(h -> h.key().equalsIgnoreCase(KAFKA_HEADER_CONTENT_TYPE)).findFirst();
        if (!contentType.isPresent()) {
            headers.add(new RecordHeader(KAFKA_HEADER_CONTENT_TYPE, STRUCTURED_CONTENT_TYPE.getBytes()));
        }
        JsonObject json = new JsonObject();
        json.put("specversion", ceMetadata.getSpecVersion()).put("type", type).put("source", source).put("id", ceMetadata.getId());
        ZonedDateTime time = ceMetadata.getTimeStamp().orElse(null);
        if (time != null) {
            json.put("time", time.toInstant());
        } else if (configuration.getCloudEventsInsertTimestamp().booleanValue()) {
            json.put("time", Instant.now());
        }
        schema.ifPresent(s -> json.put("dataschema", s));
        dataContentType.ifPresent(s -> json.put("datacontenttype", s));
        subject.ifPresent(s -> json.put("subject", s));
        ceMetadata.getExtensions().forEach(json::put);
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record)payload).value();
        }
        if (payload instanceof String) {
            json.put("data", payload);
        } else {
            json.put("data", JsonObject.mapFrom(payload));
        }
        return new ProducerRecord<Object, String>(topic, partition, timestamp, key, json.encode(), headers);
    }

    public static CloudEventMode getCloudEventMode(ConsumerRecord<?, ?> record) {
        String contentType = KafkaCloudEventHelper.getHeader(KAFKA_HEADER_CONTENT_TYPE, record);
        if (contentType != null && contentType.startsWith(CE_CONTENT_TYPE_PREFIX)) {
            return CloudEventMode.STRUCTURED;
        }
        if (KafkaCloudEventHelper.containsAllMandatoryAttributes(record)) {
            return CloudEventMode.BINARY;
        }
        return CloudEventMode.NOT_A_CLOUD_EVENT;
    }

    private static boolean containsAllMandatoryAttributes(ConsumerRecord<?, ?> record) {
        return KafkaCloudEventHelper.getHeader(KAFKA_HEADER_FOR_ID, record) != null && KafkaCloudEventHelper.getHeader(KAFKA_HEADER_FOR_SOURCE, record) != null && KafkaCloudEventHelper.getHeader(KAFKA_HEADER_FOR_TYPE, record) != null && KafkaCloudEventHelper.getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, record) != null;
    }

    private static String getHeader(String name, ConsumerRecord<?, ?> record) {
        Headers headers = record.headers();
        for (Header header : headers) {
            if (!header.key().equals(name)) continue;
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        return null;
    }

    public static enum CloudEventMode {
        STRUCTURED,
        BINARY,
        NOT_A_CLOUD_EVENT;

    }
}

