/*
 * Decompiled with CFR 0.152.
 */
package pl.allegro.tech.hermes.consumers.consumer.receiver.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.common.message.wrapper.UnwrappedMessageContent;
import pl.allegro.tech.hermes.consumers.consumer.receiver.RetryableReceiverError;
import pl.allegro.tech.hermes.consumers.consumer.receiver.SchemaExistenceEnsurer;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.KafkaHeaderExtractor;
import pl.allegro.tech.hermes.consumers.consumer.receiver.kafka.MessageContentReader;
import pl.allegro.tech.hermes.schema.SchemaId;
import pl.allegro.tech.hermes.schema.SchemaVersion;

class BasicMessageContentReader
implements MessageContentReader {
    private final CompositeMessageContentWrapper compositeMessageContentWrapper;
    private final KafkaHeaderExtractor kafkaHeaderExtractor;
    private final Topic topic;
    private final SchemaExistenceEnsurer schemaExistenceEnsurer;

    BasicMessageContentReader(CompositeMessageContentWrapper compositeMessageContentWrapper, KafkaHeaderExtractor kafkaHeaderExtractor, Topic topic, SchemaExistenceEnsurer schemaExistenceEnsurer) {
        this.compositeMessageContentWrapper = compositeMessageContentWrapper;
        this.kafkaHeaderExtractor = kafkaHeaderExtractor;
        this.topic = topic;
        this.schemaExistenceEnsurer = schemaExistenceEnsurer;
    }

    @Override
    public UnwrappedMessageContent read(ConsumerRecord<byte[], byte[]> message, ContentType contentType) {
        if (contentType == ContentType.AVRO) {
            Integer schemaVersion = this.kafkaHeaderExtractor.extractSchemaVersion(message.headers());
            Integer schemaId = this.kafkaHeaderExtractor.extractSchemaId(message.headers());
            this.ensureExistence(schemaVersion, schemaId);
            return this.compositeMessageContentWrapper.unwrapAvro((byte[])message.value(), this.topic, schemaId, schemaVersion);
        }
        if (contentType == ContentType.JSON) {
            return this.compositeMessageContentWrapper.unwrapJson((byte[])message.value());
        }
        throw new UnsupportedContentTypeException(this.topic);
    }

    private void ensureExistence(Integer schemaVersion, Integer schemaId) {
        try {
            if (schemaVersion != null) {
                this.schemaExistenceEnsurer.ensureSchemaExists(this.topic, SchemaVersion.valueOf((int)schemaVersion));
            }
            if (schemaId != null) {
                this.schemaExistenceEnsurer.ensureSchemaExists(this.topic, SchemaId.valueOf((int)schemaId));
            }
        }
        catch (SchemaExistenceEnsurer.SchemaNotLoaded ex) {
            throw new RetryableReceiverError("Requested schema not present yet...", ex);
        }
    }
}

