/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.consumer;

import java.util.ArrayList;
import java.util.Objects;
import kz.greetgo.kafka.consumer.InvokeResult;
import kz.greetgo.kafka.consumer.InvokeSession;
import kz.greetgo.kafka.consumer.InvokeSessionContext;
import kz.greetgo.kafka.consumer.InvokeSessionFactory;
import kz.greetgo.kafka.consumer.Invoker;
import kz.greetgo.kafka.consumer.Profile;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.producer.KafkaFuture;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class InvokeSessionFactoryImpl
implements InvokeSessionFactory {
    private final Invoker invoker;
    private final Logger logger;

    public InvokeSessionFactoryImpl(Logger logger, Invoker invoker) {
        this.invoker = invoker;
        this.logger = logger;
    }

    @Override
    public InvokeSession createSession() {
        return new InvokeSession(){
            private final InvokeSessionContext context = new InvokeSessionContext();

            @Override
            public InvokeResult invoke(ConsumerRecords<byte[], Object> records, Profile profile) {
                boolean needToCommit = true;
                Throwable lastInvokeError = null;
                ArrayList<KafkaFuture> kafkaFutures = new ArrayList<KafkaFuture>();
                for (ConsumerRecord record : records) {
                    Throwable error;
                    if (!InvokeSessionFactoryImpl.this.invoker.isInFilter((ConsumerRecord<byte[], Object>)record)) continue;
                    this.context.kafkaFutures.clear();
                    InvokeResult invokeResult = InvokeSessionFactoryImpl.this.invoker.invoke((ConsumerRecord<byte[], Object>)record, this.context, profile);
                    if (!invokeResult.needToCommit()) {
                        needToCommit = false;
                    }
                    if ((error = invokeResult.invocationError()) != null) {
                        lastInvokeError = error;
                        if (InvokeSessionFactoryImpl.this.logger.isShow(LoggerType.CONSUMER_LOG_DIRECT_INVOKE_ERROR)) {
                            InvokeSessionFactoryImpl.this.logger.logConsumerLogDirectInvokeError(error);
                        }
                    }
                    kafkaFutures.addAll(this.context.kafkaFutures);
                    this.context.kafkaFutures.clear();
                }
                kafkaFutures.stream().filter(Objects::nonNull).forEach(KafkaFuture::awaitAndGet);
                return InvokeResult.of(needToCommit, lastInvokeError);
            }

            @Override
            public void close() {
                this.context.close();
            }
        };
    }
}

