/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.universaldb.distribute;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.universaldb.SchemaStats;
import org.teamapps.universaldb.distribute.ClusterSetConfig;
import org.teamapps.universaldb.distribute.PacketDataMingling;
import org.teamapps.universaldb.distribute.TransactionExecutionResult;
import org.teamapps.universaldb.distribute.TransactionMessageKey;
import org.teamapps.universaldb.index.DataBaseMapper;
import org.teamapps.universaldb.transaction.ClusterTransaction;
import org.teamapps.universaldb.transaction.TransactionIdHandler;
import org.teamapps.universaldb.transaction.TransactionPacket;

public class TransactionReader {
    public static final String RESOLVED_SUFFIX = "resolved";
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String clientId;
    private final String masterProducerClientId;
    private final String sharedSecret;
    private final SchemaStats schemaStats;
    private final DataBaseMapper dataBaseMapper;
    private final Consumer<byte[], byte[]> consumer;
    private final String topic;
    private final TopicPartition topicPartition;
    private final Map<Long, TransactionExecutionResult> transactionMap;
    private final TransactionIdHandler transactionIdHandler;

    public TransactionReader(ClusterSetConfig clusterConfig, SchemaStats schemaStats, DataBaseMapper dataBaseMapper, Map<Long, TransactionExecutionResult> transactionMap, TransactionIdHandler transactionIdHandler) {
        this.sharedSecret = clusterConfig.getSharedSecret();
        this.schemaStats = schemaStats;
        this.dataBaseMapper = dataBaseMapper;
        this.transactionMap = transactionMap;
        this.transactionIdHandler = transactionIdHandler;
        this.clientId = schemaStats.getClientId();
        this.masterProducerClientId = schemaStats.getMasterClientId();
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", clusterConfig.getKafkaConfig());
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("max.poll.records", (Object)1000000);
        consumerProps.put("max.partition.fetch.bytes", (Object)0x1E00000);
        consumerProps.put("session.timeout.ms", (Object)30000);
        consumerProps.put("heartbeat.interval.ms", (Object)9000);
        logger.info("Start with transaction offset:" + schemaStats.getTransactionOffset());
        this.topic = clusterConfig.getTopicPrefix() + "-resolved";
        this.topicPartition = new TopicPartition(this.topic, 0);
        this.consumer = new KafkaConsumer(consumerProps);
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.consumer.seek(this.topicPartition, schemaStats.getTransactionOffset());
        new Thread(() -> this.start()).start();
    }

    private void start() {
        while (true) {
            try {
                while (true) {
                    this.consume();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
                continue;
            }
            break;
        }
    }

    private void consume() throws IOException {
        ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofSeconds(1L));
        List records = consumerRecords.records(this.topicPartition);
        for (ConsumerRecord record : records) {
            TransactionExecutionResult executionResult;
            TransactionMessageKey messageKey = new TransactionMessageKey((byte[])record.key());
            byte[] value = (byte[])record.value();
            byte[] bytes = PacketDataMingling.mingle(value, this.sharedSecret, messageKey.getPacketKey());
            TransactionPacket transactionPacket = new TransactionPacket(bytes);
            ClusterTransaction transaction = new ClusterTransaction(transactionPacket, this.dataBaseMapper);
            if (messageKey.getClientId().equals(this.clientId) && (executionResult = this.transactionMap.remove(messageKey.getTransactionKeyOfCallingNode())) != null) {
                if (value != null) {
                    executionResult.handleSuccess(transaction.getRecordIdByCorrelationId());
                } else {
                    executionResult.handleError();
                }
            }
            if (!messageKey.getMasterClientId().equals(this.masterProducerClientId)) {
                if (transaction.getTransactionId() == this.transactionIdHandler.getLastCommittedTransactionId() + 1L) {
                    transaction.executeResolvedTransaction(this.transactionIdHandler);
                } else {
                    logger.warn("Transaction with wrong transaction id! Expected id:" + (this.transactionIdHandler.getLastCommittedTransactionId() + 1L) + ", actual id:" + transaction.getTransactionId() + ", key:" + messageKey);
                }
            }
            this.schemaStats.setTransactionOffset(record.offset() + 1L);
            this.schemaStats.setMasterTransactionOffset(messageKey.getMasterOffset() + 1L);
        }
    }
}

