/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.KafkaUtils;
import org.kie.hacep.PrinterKafkaImpl;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.message.SnapshotMessage;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotOnDemandTest {
    private KafkaUtils kafkaServerTest;
    private EnvConfig config;
    private Logger logger = LoggerFactory.getLogger(SnapshotOnDemandTest.class);

    public static EnvConfig getEnvConfig() {
        return EnvConfig.anEnvConfig().withNamespace("default").withControlTopicName("control").withEventsTopicName("events").withSnapshotTopicName("snapshot").withKieSessionInfosTopicName("kiesessioninfos").withPrinterType(PrinterKafkaImpl.class.getName()).withPollTimeUnit("sec").withPollTimeout("10").withPollSnapshotTimeUnit("sec").withPollSnapshotTimeout("10").withMaxSnapshotAgeSeconds("60000").underTest(true);
    }

    @Before
    public void setUp() throws Exception {
        this.config = SnapshotOnDemandTest.getEnvConfig();
        this.kafkaServerTest = new KafkaUtils();
        this.kafkaServerTest.startServer();
    }

    @After
    public void tearDown() {
        this.kafkaServerTest.tearDown();
    }

    @Test(timeout=30000L)
    public void createSnapshotOnDemandTest() {
        Bootstrap.startEngine((EnvConfig)this.config);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer eventsConsumer = this.kafkaServerTest.getConsumer(this.config.getEventsTopicName(), Config.getConsumerConfig((String)"SnapshotOnDemandTest.createSnapshotOnDemandTest"));
        KafkaConsumer controlConsumer = this.kafkaServerTest.getConsumer(this.config.getControlTopicName(), Config.getConsumerConfig((String)"SnapshotOnDemandTest.createSnapshotOnDemandTest"));
        KafkaConsumer snapshotConsumer = this.kafkaServerTest.getConsumer(this.config.getSnapshotTopicName(), Config.getConsumerConfig((String)"SnapshotOnDemandTest.createSnapshotOnDemandTest"));
        try {
            ConsumerRecords eventsRecords = eventsConsumer.poll(Duration.ofSeconds(2L));
            Assert.assertEquals((long)0L, (long)eventsRecords.count());
            ConsumerRecords controlRecords = controlConsumer.poll(Duration.ofSeconds(2L));
            Assert.assertEquals((long)0L, (long)controlRecords.count());
            ConsumerRecords snapshotRecords = snapshotConsumer.poll(Duration.ofSeconds(2L));
            Assert.assertEquals((long)0L, (long)snapshotRecords.count());
            KafkaUtils.insertSnapshotOnDemandCommand();
            ArrayList messages = new ArrayList();
            AtomicInteger attempts = new AtomicInteger(0);
            while (messages.size() < 1) {
                snapshotRecords = snapshotConsumer.poll(Duration.ofSeconds(5L));
                snapshotRecords.forEach(o -> {
                    ConsumerRecord controlRecord = (ConsumerRecord)o;
                    SnapshotMessage snapshotMessage = (SnapshotMessage)SerializationUtil.deserialize((byte[])((byte[])controlRecord.value()));
                    messages.add(snapshotMessage);
                });
                int attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number:{}", (Object)attemptNumber);
                if (attempts.get() != 10) continue;
                throw new RuntimeException("No control message available after " + attempts + "attempts in waitForControlMessage");
            }
            Assert.assertEquals((long)1L, (long)messages.size());
            Iterator messagesIter = messages.iterator();
            SnapshotMessage msg = (SnapshotMessage)messagesIter.next();
            Assert.assertNotNull((Object)msg);
            Assert.assertTrue((boolean)msg.getFhManager().getFhMapKeys().isEmpty());
            Assert.assertEquals((long)0L, (long)msg.getLastInsertedEventOffset());
            Assert.assertNotNull((Object)msg.getSerializedSession());
            eventsRecords = eventsConsumer.poll(Duration.ofSeconds(1L));
            Assert.assertEquals((long)1L, (long)eventsRecords.count());
            controlRecords = controlConsumer.poll(Duration.ofSeconds(1L));
            Assert.assertEquals((long)1L, (long)controlRecords.count());
        }
        catch (Exception ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        }
        finally {
            eventsConsumer.close();
            controlConsumer.close();
            snapshotConsumer.close();
            Bootstrap.stopEngine();
        }
    }
}

