/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.consumer;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kz.greetgo.kafka.consumer.InnerProducer;
import kz.greetgo.kafka.consumer.InnerProducerSender;
import kz.greetgo.kafka.consumer.InvokeSessionContext;
import kz.greetgo.kafka.consumer.Invoker;
import kz.greetgo.kafka.consumer.ParameterValueReader;
import kz.greetgo.kafka.consumer.annotations.Author;
import kz.greetgo.kafka.consumer.annotations.ConsumerName;
import kz.greetgo.kafka.consumer.annotations.InnerProducerName;
import kz.greetgo.kafka.consumer.annotations.KafkaCommitOn;
import kz.greetgo.kafka.consumer.annotations.Offset;
import kz.greetgo.kafka.consumer.annotations.Partition;
import kz.greetgo.kafka.consumer.annotations.Timestamp;
import kz.greetgo.kafka.consumer.annotations.ToTopic;
import kz.greetgo.kafka.consumer.annotations.Topic;
import kz.greetgo.kafka.consumer.parameters.InnerProducerSenderValueReader;
import kz.greetgo.kafka.consumer.parameters.InnerProducerValueReader;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.errors.AbsentAnnotationToTopicOverInnerProducer;
import kz.greetgo.kafka.errors.IllegalParameterType;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.KafkaFuture;
import kz.greetgo.kafka.producer.ProducerFacade;
import kz.greetgo.kafka.util.AnnotationUtil;
import kz.greetgo.kafka.util.GenericUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class InvokerBuilder {
    private final Object controller;
    private final Method method;
    private final Logger logger;

    public InvokerBuilder(Object controller, Method method, Logger logger) {
        this.controller = controller;
        this.method = method;
        this.logger = logger;
    }

    public Invoker build() {
        Topic topic = AnnotationUtil.getAnnotation(this.method, Topic.class);
        if (topic == null) {
            throw new IllegalStateException("No annotation Topic for " + this.method);
        }
        String tmpConsumerName = this.method.getName();
        ConsumerName annotation = AnnotationUtil.getAnnotation(this.method, ConsumerName.class);
        if (annotation != null) {
            tmpConsumerName = annotation.value();
        }
        final String consumerName = tmpConsumerName;
        Class[] tmpCommitOn = new Class[]{};
        final Class[] commitOn = AnnotationUtil.getAnnotation(this.method, KafkaCommitOn.class);
        if (commitOn != null) {
            tmpCommitOn = commitOn.value();
        }
        commitOn = tmpCommitOn;
        InnerProducerName parentProducerName = AnnotationUtil.getAnnotation(this.method, InnerProducerName.class);
        if (parentProducerName == null) {
            parentProducerName = AnnotationUtil.getAnnotation(this.controller.getClass(), InnerProducerName.class);
        }
        final Set topicSet = Arrays.stream(topic.value()).collect(Collectors.toSet());
        Type[] parameterTypes = this.method.getGenericParameterTypes();
        Annotation[][] parameterAnnotations = AnnotationUtil.getParameterAnnotations(this.method);
        assert (parameterTypes.length == parameterAnnotations.length);
        final int parametersCount = parameterTypes.length;
        final ParameterValueReader[] parameterValueReaders = new ParameterValueReader[parametersCount];
        for (int i = 0; i < parametersCount; ++i) {
            parameterValueReaders[i] = this.createParameterValueReader(parameterTypes[i], parameterAnnotations[i], parentProducerName);
        }
        Class<?> tmpGettingBodyClass = null;
        for (ParameterValueReader parameterValueReader : parameterValueReaders) {
            Class<?> aClass = parameterValueReader.gettingBodyClass();
            if (aClass == null) continue;
            tmpGettingBodyClass = aClass;
        }
        final Class<?> gettingBodyClass = tmpGettingBodyClass;
        final HashSet<String> usingProducerNames = new HashSet<String>();
        for (ParameterValueReader parameterValueReader : parameterValueReaders) {
            usingProducerNames.addAll(parameterValueReader.getProducerNames());
        }
        return new Invoker(){

            @Override
            public Set<String> getUsingProducerNames() {
                return usingProducerNames;
            }

            @Override
            public Invoker.InvokeSession createSession() {
                return new Invoker.InvokeSession(){
                    private final InvokeSessionContext context = new InvokeSessionContext();

                    @Override
                    public void putProducer(String producerName, ProducerFacade producer) {
                        this.context.putProducer(producerName, producer);
                    }

                    @Override
                    public Invoker.InvokeResult invoke(ConsumerRecords<byte[], Box> records) {
                        boolean needToCommit = true;
                        Throwable lastInvokeError = null;
                        ArrayList<KafkaFuture> kafkaFutures = new ArrayList<KafkaFuture>();
                        for (ConsumerRecord record : records) {
                            Throwable error;
                            if (!this.isInFilter((ConsumerRecord<byte[], Box>)record)) continue;
                            this.context.kafkaFutures.clear();
                            Object[] parameters = new Object[parametersCount];
                            for (int i = 0; i < parametersCount; ++i) {
                                parameters[i] = parameterValueReaders[i].read((ConsumerRecord<byte[], Box>)record, this.context);
                            }
                            Invoker.InvokeResult invokeResult = this.invokeMethod(parameters);
                            if (!invokeResult.needToCommit()) {
                                needToCommit = false;
                            }
                            if ((error = invokeResult.exceptionInMethod()) != null) {
                                lastInvokeError = error;
                            }
                            for (int i = 0; i < parametersCount; ++i) {
                                kafkaFutures.addAll(this.context.kafkaFutures);
                            }
                            this.context.kafkaFutures.clear();
                        }
                        kafkaFutures.stream().filter(Objects::nonNull).forEach(KafkaFuture::awaitAndGet);
                        return InvokerBuilder.this.newInvokeResult(needToCommit, lastInvokeError);
                    }

                    private Invoker.InvokeResult invokeMethod(Object[] parameters) {
                        try {
                            InvokerBuilder.this.method.invoke(InvokerBuilder.this.controller, parameters);
                            return InvokerBuilder.this.invokeResultOk();
                        }
                        catch (IllegalAccessException e) {
                            if (InvokerBuilder.this.logger.isShow(LoggerType.LOG_CONSUMER_ILLEGAL_ACCESS_EXCEPTION_INVOKING_METHOD)) {
                                InvokerBuilder.this.logger.logConsumerIllegalAccessExceptionInvokingMethod(e, consumerName, InvokerBuilder.this.controller, InvokerBuilder.this.method);
                            }
                            return InvokerBuilder.this.newInvokeResult(false, e);
                        }
                        catch (InvocationTargetException e) {
                            Throwable error = e.getTargetException();
                            if (InvokerBuilder.this.logger.isShow(LoggerType.LOG_CONSUMER_ERROR_IN_METHOD)) {
                                InvokerBuilder.this.logger.logConsumerErrorInMethod(error, consumerName, InvokerBuilder.this.controller, InvokerBuilder.this.method);
                            }
                            for (Class aClass : commitOn) {
                                if (!aClass.isInstance(error)) continue;
                                return InvokerBuilder.this.newInvokeResult(true, error);
                            }
                            return InvokerBuilder.this.newInvokeResult(false, error);
                        }
                    }

                    @Override
                    public void close() {
                        this.context.close();
                    }
                };
            }

            boolean isInFilter(ConsumerRecord<byte[], Box> record) {
                Box box;
                if (!topicSet.contains(record.topic())) {
                    return false;
                }
                if (gettingBodyClass != null) {
                    if (gettingBodyClass == Box.class) {
                        return true;
                    }
                    if (!gettingBodyClass.isInstance(((Box)record.value()).body)) {
                        return false;
                    }
                }
                if ((box = (Box)record.value()) == null) {
                    return false;
                }
                List<String> ignorableConsumers = box.ignorableConsumers;
                return ignorableConsumers == null || !ignorableConsumers.contains(consumerName);
            }

            @Override
            public boolean isAutoCommit() {
                return false;
            }

            @Override
            public String getConsumerName() {
                return consumerName;
            }
        };
    }

    private Invoker.InvokeResult newInvokeResult(final boolean needToCommit, final Throwable exceptionInMethod) {
        return new Invoker.InvokeResult(){

            @Override
            public boolean needToCommit() {
                return needToCommit;
            }

            @Override
            public Throwable exceptionInMethod() {
                return exceptionInMethod;
            }
        };
    }

    private Invoker.InvokeResult invokeResultOk() {
        return this.newInvokeResult(true, null);
    }

    private ParameterValueReader createParameterValueReader(final Type parameterType, Annotation[] parameterAnnotations, InnerProducerName parentProducerName) {
        InnerProducerName producerName = parentProducerName;
        ToTopic toTopic = null;
        AtomicReference<String> finalProducerName = new AtomicReference<String>("default_inner_producer");
        for (Annotation annotation : parameterAnnotations) {
            if (annotation instanceof Partition) {
                if (!GenericUtil.isOfClass(parameterType, Integer.TYPE) && !GenericUtil.isOfClass(parameterType, Integer.class)) {
                    throw new IllegalParameterType("Parameter with @Partition must be `int` or `Integer`");
                }
                return (record, invokeSessionContext) -> record.partition();
            }
            if (annotation instanceof Offset) {
                if (!GenericUtil.isOfClass(parameterType, Long.TYPE) && !GenericUtil.isOfClass(parameterType, Long.class)) {
                    throw new IllegalParameterType("Parameter with @Offset must be `long` or `Long`");
                }
                return (record, invokeSessionContext) -> record.offset();
            }
            if (annotation instanceof Timestamp) {
                if (GenericUtil.isOfClass(parameterType, Date.class)) {
                    return (record, invokeSessionContext) -> new Date(record.timestamp());
                }
                if (GenericUtil.isOfClass(parameterType, Long.TYPE) || GenericUtil.isOfClass(parameterType, Long.class)) {
                    return (record, invokeSessionContext) -> record.timestamp();
                }
                throw new IllegalParameterType("Parameter with @Offset must be `long` or `Long` or `java.util.Date`");
            }
            if (annotation instanceof Author) {
                if (!GenericUtil.isOfClass(parameterType, String.class)) {
                    throw new IllegalParameterType("Parameter with @Author must be `String`");
                }
                return (record, invokeSessionContext) -> ((Box)record.value()).author;
            }
            if (annotation instanceof InnerProducerName) {
                producerName = (InnerProducerName)annotation;
            }
            if (!(annotation instanceof ToTopic)) continue;
            toTopic = (ToTopic)annotation;
        }
        if (GenericUtil.isOfClass(parameterType, Box.class)) {
            return (record, invokeSessionContext) -> (Box)record.value();
        }
        if (GenericUtil.isOfClass(parameterType, InnerProducerSender.class)) {
            if (producerName != null) {
                finalProducerName.set(producerName.value());
            }
            return new InnerProducerSenderValueReader(finalProducerName.get());
        }
        if (GenericUtil.isOfClass(parameterType, InnerProducer.class)) {
            if (producerName != null) {
                finalProducerName.set(producerName.value());
            }
            if (toTopic == null) {
                throw new AbsentAnnotationToTopicOverInnerProducer();
            }
            return new InnerProducerValueReader(finalProducerName.get(), toTopic.value());
        }
        return new ParameterValueReader(){

            @Override
            public Object read(ConsumerRecord<byte[], Box> record, InvokeSessionContext invokeSessionContext) {
                return ((Box)record.value()).body;
            }

            @Override
            public Class<?> gettingBodyClass() {
                return GenericUtil.extractClass(parameterType);
            }
        };
    }
}

