/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.universaldb.distribute;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teamapps.universaldb.SchemaStats;
import org.teamapps.universaldb.distribute.ClusterSetConfig;
import org.teamapps.universaldb.distribute.PacketDataMingling;
import org.teamapps.universaldb.distribute.TransactionMessageKey;
import org.teamapps.universaldb.index.DataBaseMapper;
import org.teamapps.universaldb.transaction.ClusterTransaction;
import org.teamapps.universaldb.transaction.TransactionIdHandler;
import org.teamapps.universaldb.transaction.TransactionPacket;

public class TransactionMaster
implements LeaderSelectorListener,
Closeable,
Callback {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String LEADER_SELECTION_NODE = "/leaderSelection/leader";
    private final String sharedSecret;
    private final ClusterSetConfig clusterConfig;
    private final SchemaStats schemaStats;
    private final DataBaseMapper dataBaseMapper;
    private final TransactionIdHandler transactionIdHandler;
    private final Consumer<byte[], byte[]> consumer;
    private final KafkaProducer<byte[], byte[]> producer;
    private final LeaderSelector leaderSelector;
    private final String consumerTopic;
    private final TopicPartition consumerTopicPartition;
    private final String producerTopic;
    private volatile boolean masterRole;
    private final String masterProducerClientId;
    private long packetKey;

    public TransactionMaster(ClusterSetConfig clusterConfig, SchemaStats schemaStats, DataBaseMapper dataBaseMapper, TransactionIdHandler transactionIdHandler) {
        this.clusterConfig = clusterConfig;
        this.sharedSecret = clusterConfig.getSharedSecret();
        this.schemaStats = schemaStats;
        this.dataBaseMapper = dataBaseMapper;
        this.transactionIdHandler = transactionIdHandler;
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", clusterConfig.getKafkaConfig());
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("max.poll.records", (Object)1000000);
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("auto.offset.reset", "earliest");
        logger.info("Start with master transaction offset:" + schemaStats.getMasterTransactionOffset());
        this.consumerTopic = clusterConfig.getTopicPrefix() + "-unresolved";
        this.consumerTopicPartition = new TopicPartition(this.consumerTopic, 0);
        this.consumer = new KafkaConsumer(consumerProps);
        this.consumer.assign(Collections.singletonList(this.consumerTopicPartition));
        this.consumer.seek(this.consumerTopicPartition, schemaStats.getMasterTransactionOffset());
        this.masterProducerClientId = schemaStats.getMasterClientId();
        this.producerTopic = clusterConfig.getTopicPrefix() + "-resolved";
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", clusterConfig.getKafkaConfig());
        producerProps.put("client.id", this.masterProducerClientId);
        producerProps.put("key.serializer", ByteArraySerializer.class.getName());
        producerProps.put("value.serializer", ByteArraySerializer.class.getName());
        producerProps.put("acks", "1");
        producerProps.put("max.in.flight.requests.per.connection", (Object)1);
        producerProps.put("batch.size", (Object)1000000);
        producerProps.put("max.request.size", (Object)0xA00000);
        this.producer = new KafkaProducer(producerProps);
        CuratorFramework client = CuratorFrameworkFactory.newClient((String)clusterConfig.getZookeeperConfig(), (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
        this.leaderSelector = new LeaderSelector(client, LEADER_SELECTION_NODE, (LeaderSelectorListener)this);
        client.start();
        this.leaderSelector.autoRequeue();
        this.leaderSelector.start();
    }

    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (!connectionState.isConnected()) {
            logger.info("Lost connection to zookeeper");
            this.masterRole = false;
        }
    }

    public void takeLeadership(CuratorFramework client) throws Exception {
        long unconsumedMessageCount;
        logger.info("START TRANSACTION MASTER: " + Thread.currentThread().getName());
        if (this.masterRole) {
            logger.warn("Error: starting master twice!");
            return;
        }
        this.masterRole = true;
        while (this.masterRole && (unconsumedMessageCount = this.getUnconsumedMessageCount()) > 0L) {
            logger.info("Master waiting for worker topics to be consumed:" + unconsumedMessageCount);
            Thread.sleep(1000L);
        }
        this.consumer.seek(this.consumerTopicPartition, this.schemaStats.getMasterTransactionOffset());
        while (this.masterRole) {
            System.out.println("Consume master transaction log...");
            this.handleMessages();
        }
        logger.info("END TRANSACTION MASTER");
    }

    private long getUnconsumedMessageCount() {
        try {
            Properties consumerProps = new Properties();
            consumerProps.put("bootstrap.servers", this.clusterConfig.getKafkaConfig());
            consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
            consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
            consumerProps.put("auto.offset.reset", "latest");
            KafkaConsumer resolvedTopicConsumer = new KafkaConsumer(consumerProps);
            TopicPartition topicPartition = new TopicPartition(this.clusterConfig.getTopicPrefix() + "-resolved", 0);
            Map endOffsets = resolvedTopicConsumer.endOffsets(Collections.singletonList(topicPartition));
            Long latestOffset = (Long)endOffsets.get(topicPartition);
            long unconsumedMessages = latestOffset - 1L - this.schemaStats.getMasterTransactionOffset();
            resolvedTopicConsumer.close();
            return unconsumedMessages;
        }
        catch (Throwable e) {
            e.printStackTrace();
            return Integer.MAX_VALUE;
        }
    }

    private void handleMessages() throws IOException {
        ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofSeconds(3L));
        List records = consumerRecords.records(this.consumerTopicPartition);
        for (ConsumerRecord record : records) {
            TransactionMessageKey messageKey = new TransactionMessageKey((byte[])record.key());
            byte[] value = (byte[])record.value();
            byte[] bytes = PacketDataMingling.mingle(value, this.sharedSecret, messageKey.getPacketKey());
            logger.debug("MASTER received new transaction:" + messageKey);
            TransactionPacket transactionPacket = new TransactionPacket(bytes);
            ClusterTransaction transaction = new ClusterTransaction(transactionPacket, this.dataBaseMapper);
            TransactionMessageKey masterMessageKey = TransactionMessageKey.createFromKey(messageKey, this.getNextKey(), this.masterProducerClientId, record.offset());
            transactionPacket = transaction.resolveAndExecuteTransaction(this.transactionIdHandler, transactionPacket);
            if (transactionPacket != null) {
                logger.debug("MASTER send new transaction:" + masterMessageKey + ", transaction-id:" + transaction.getTransactionId());
                byte[] packetBytes = transactionPacket.writePacketBytes();
                byte[] mingledBytes = PacketDataMingling.mingle(packetBytes, this.sharedSecret, masterMessageKey.getPacketKey());
                this.producer.send(new ProducerRecord(this.producerTopic, (Object)masterMessageKey.getBytes(), (Object)mingledBytes), (Callback)this);
                continue;
            }
            logger.info("Sending error packet...");
            this.producer.send(new ProducerRecord(this.producerTopic, (Object)masterMessageKey.getBytes(), null));
        }
    }

    @Override
    public void close() throws IOException {
        this.leaderSelector.close();
    }

    private long getNextKey() {
        return ++this.packetKey;
    }

    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            logger.warn("Error writing master transaction:" + e.getMessage());
        }
    }
}

