/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.KafkaFullTopicsTests;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.hacep.core.infra.message.SnapshotMessage;
import org.kie.remote.CommonConfig;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.util.KafkaRemoteUtil;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PodAsLeaderSnapshotTest
extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger((String)"org.hacep");

    @Test(timeout=30000L)
    public void processMessagesAsLeaderAndCreateSnapshotTest() {
        Bootstrap.startEngine((EnvConfig)this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer eventsConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig((String)"eventsProcessMessagesAsLeaderAndCreateSnapshotTest"));
        KafkaConsumer snapshotConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getSnapshotTopicName(), Config.getSnapshotConsumerConfig());
        KafkaConsumer controlConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig((String)"controlProcessMessagesAsLeaderAndCreateSnapshotTest"));
        this.kafkaServerTest.insertBatchStockTicketEvent(10, this.topicsConfig, RemoteKieSession.class, KafkaRemoteUtil.getListener((Properties)CommonConfig.getTestProperties(), (boolean)false));
        try {
            ConsumerRecords snapshotRecords;
            ConsumerRecords records;
            int attemptNumber;
            int events;
            ConsumerRecords eventsRecords;
            AtomicInteger attempts = new AtomicInteger(0);
            this.logger.warn("Checks on Events Topic");
            for (events = 0; events < 11; events += eventsRecords.count()) {
                eventsRecords = eventsConsumer.poll(Duration.ofSeconds(3L));
                attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number on events topic:{}", (Object)attemptNumber);
                if (attemptNumber != 30) continue;
                throw new RuntimeException("No enough Events message available " + events + " after " + attempts + "attempts.");
            }
            Assert.assertEquals((long)11L, (long)events);
            this.logger.warn("Checks on Control Topic");
            attempts.set(0);
            for (events = 0; events < 11; events += records.count()) {
                records = controlConsumer.poll(Duration.ofSeconds(3L));
                attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number on control topic:{}", (Object)attemptNumber);
                if (attemptNumber != 30) continue;
                throw new RuntimeException("No enough Control message available " + events + " after " + attempts + "attempts.");
            }
            Assert.assertEquals((long)11L, (long)events);
            this.logger.warn("Checks on Snapshot Topic");
            attempts.set(0);
            for (events = 0; events < 1; events += snapshotRecords.count()) {
                snapshotRecords = snapshotConsumer.poll(Duration.ofSeconds(3L));
                snapshotRecords.forEach(o -> {
                    ConsumerRecord record = (ConsumerRecord)o;
                    SnapshotMessage snapshot = (SnapshotMessage)SerializationUtil.deserialize((byte[])((byte[])record.value()));
                    Assert.assertNotNull((Object)snapshot);
                    Assert.assertTrue((snapshot.getLastInsertedEventOffset() > 0L ? 1 : 0) != 0);
                    Assert.assertFalse((boolean)snapshot.getFhMapKeys().isEmpty());
                    Assert.assertNotNull((Object)snapshot.getLastInsertedEventkey());
                    Assert.assertEquals((long)9L, (long)snapshot.getFhMapKeys().size());
                    Assert.assertNotNull((Object)snapshot.getLastInsertedEventkey());
                });
                attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number on snapshot topic:{}", (Object)attemptNumber);
                if (attemptNumber != 30) continue;
                throw new RuntimeException("No enough Snapshot message available " + events + " after " + attempts + "attempts.");
            }
            Assert.assertEquals((long)1L, (long)events);
        }
        catch (Exception ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        }
        finally {
            eventsConsumer.close();
            snapshotConsumer.close();
        }
    }
}

