/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep.core.infra.utils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.kie.api.KieServices;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.KieSessionConfiguration;
import org.kie.api.runtime.conf.ClockTypeOption;
import org.kie.api.runtime.conf.KieSessionOption;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.consumer.KieContainerUtils;
import org.kie.hacep.core.GlobalStatus;
import org.kie.hacep.core.infra.SessionSnapshooter;
import org.kie.hacep.core.infra.SnapshotInfos;
import org.kie.hacep.core.infra.message.SnapshotMessage;
import org.kie.hacep.core.infra.utils.SnapshotOnDemandUtils;
import org.kie.remote.TopicsConfig;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.command.SnapshotOnDemandCommand;
import org.kie.remote.impl.producer.Producer;
import org.kie.remote.impl.producer.Sender;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotOnDemandUtilsImpl
implements SnapshotOnDemandUtils {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotOnDemandUtilsImpl.class);

    @Override
    public SnapshotInfos askASnapshotOnDemand(EnvConfig config, SessionSnapshooter snapshooter, Producer producer) {
        LocalDateTime infosTime = snapshooter.getLastSnapshotTime();
        LocalDateTime limitAge = LocalDateTime.now().minusSeconds(config.getMaxSnapshotAge());
        if (infosTime != null && limitAge.isBefore(infosTime)) {
            if (logger.isInfoEnabled()) {
                logger.info("Deserialize a recent snapshot");
            }
            return snapshooter.deserialize();
        }
        if (logger.isInfoEnabled()) {
            logger.info("Build NewSnapshotOnDemand ");
        }
        return this.buildNewSnapshotOnDemand(config, limitAge, producer);
    }

    private SnapshotInfos buildNewSnapshotOnDemand(EnvConfig envConfig, LocalDateTime limitAge, Producer producer) {
        SnapshotMessage snapshotMsg = this.askAndReadSnapshotOnDemand(envConfig, limitAge, producer);
        KieSession kSession = null;
        KieContainer kieContainer = null;
        try (ByteArrayInputStream in = new ByteArrayInputStream(snapshotMsg.getSerializedSession());){
            KieServices ks = KieServices.get();
            kieContainer = KieContainerUtils.getKieContainer((EnvConfig)envConfig, (KieServices)ks);
            KieSessionConfiguration conf = ks.newKieSessionConfiguration();
            conf.setOption((KieSessionOption)ClockTypeOption.get((String)"pseudo"));
            kSession = ks.getMarshallers().newMarshaller(kieContainer.getKieBase()).unmarshall((InputStream)in, conf, null);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        return new SnapshotInfos(kSession, kieContainer, snapshotMsg.getFhManager(), snapshotMsg.getLastInsertedEventkey(), snapshotMsg.getLastInsertedEventOffset(), snapshotMsg.getTime(), snapshotMsg.getKjarGAV());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SnapshotMessage askAndReadSnapshotOnDemand(EnvConfig envConfig, LocalDateTime limitAge, Producer producer) {
        Properties props = Config.getProducerConfig((String)"SnapshotOnDemandUtils.askASnapshotOnDemand");
        Sender sender = new Sender(props, producer);
        sender.start();
        sender.sendCommand((RemoteCommand)new SnapshotOnDemandCommand(), TopicsConfig.getDefaultTopicsConfig().getEventsTopicName());
        sender.stop();
        KafkaConsumer consumer = this.getConfiguredSnapshotConsumer(envConfig);
        boolean snapshotReady = false;
        SnapshotMessage msg = null;
        try {
            GlobalStatus.setCanBecomeLeader((boolean)false);
            int counter = 0;
            while (!snapshotReady) {
                SnapshotMessage snapshotMsg;
                ConsumerRecords records = consumer.poll(envConfig.getPollSnapshotDuration());
                byte[] bytes = null;
                for (ConsumerRecord record : records) {
                    bytes = (byte[])record.value();
                }
                SnapshotMessage snapshotMessage = snapshotMsg = bytes != null ? (SnapshotMessage)SerializationUtil.deserialize(bytes) : null;
                if (snapshotMsg != null && limitAge.isBefore(snapshotMsg.getTime())) {
                    snapshotReady = true;
                    msg = snapshotMsg;
                    continue;
                }
                if (++counter <= envConfig.getMaxSnapshotRequestAttempts()) continue;
                GlobalStatus.setNodeLive((boolean)false);
                String errorMessage = "Impossible to retrieve a snapshot and start after " + counter + " attempts";
                logger.error(errorMessage);
                throw new IllegalStateException(errorMessage);
            }
        }
        finally {
            consumer.close();
            GlobalStatus.setCanBecomeLeader((boolean)true);
        }
        return msg;
    }

    @Override
    public KafkaConsumer getConfiguredSnapshotConsumer(EnvConfig envConfig) {
        KafkaConsumer consumer = new KafkaConsumer(Config.getSnapshotConsumerConfig());
        List partitionsInfo = consumer.partitionsFor(envConfig.getSnapshotTopicName());
        ArrayList<TopicPartition> partitionCollection = new ArrayList<TopicPartition>();
        if (partitionsInfo != null) {
            for (PartitionInfo partition : partitionsInfo) {
                TopicPartition topicPartition2 = new TopicPartition(partition.topic(), partition.partition());
                partitionCollection.add(topicPartition2);
            }
            if (!partitionCollection.isEmpty()) {
                consumer.assign(partitionCollection);
            }
        }
        consumer.assignment().forEach(topicPartition -> consumer.seekToBeginning(partitionCollection));
        return consumer;
    }
}

