/*
 * Decompiled with CFR 0.152.
 */
package org.kie.hacep;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.kie.hacep.Config;
import org.kie.hacep.EnvConfig;
import org.kie.hacep.KafkaFullTopicsTests;
import org.kie.hacep.core.Bootstrap;
import org.kie.hacep.core.infra.election.State;
import org.kie.remote.RemoteKieSession;
import org.kie.remote.command.FireUntilHaltCommand;
import org.kie.remote.command.InsertCommand;
import org.kie.remote.command.RemoteCommand;
import org.kie.remote.message.ControlMessage;
import org.kie.remote.util.KafkaRemoteUtil;
import org.kie.remote.util.SerializationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PodAsLeaderTest
extends KafkaFullTopicsTests {
    private Logger logger = LoggerFactory.getLogger((String)"org.hacep");

    @Test(timeout=15000L)
    public void processOneSentMessageAsLeaderTest() {
        Bootstrap.startEngine((EnvConfig)this.envConfig);
        Bootstrap.getConsumerController().getCallback().updateStatus(State.LEADER);
        KafkaConsumer eventsConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getEventsTopicName(), Config.getConsumerConfig((String)"eventsConsumerProcessOneSentMessageAsLeaderTest"));
        KafkaConsumer controlConsumer = this.kafkaServerTest.getConsumer(this.envConfig.getControlTopicName(), Config.getConsumerConfig((String)"controlConsumerProcessOneSentMessageAsLeaderTest"));
        Properties props = (Properties)Config.getProducerConfig((String)"InsertBatchStockTickets").clone();
        props.put("skip.listener.autostart", (Object)true);
        this.logger.warn("Insert Stock Ticket event");
        this.kafkaServerTest.insertBatchStockTicketEvent(1, this.topicsConfig, RemoteKieSession.class, props, KafkaRemoteUtil.getListener((Properties)props, (boolean)false));
        try {
            this.logger.warn("Checks on Events topic");
            AtomicReference firstEvent = new AtomicReference();
            AtomicReference secondEvent = new AtomicReference();
            AtomicInteger index = new AtomicInteger(0);
            AtomicInteger attempts = new AtomicInteger(0);
            while (index.get() < 2) {
                ConsumerRecords eventsRecords = eventsConsumer.poll(Duration.ofSeconds(2L));
                eventsRecords.forEach(o -> {
                    ConsumerRecord event = (ConsumerRecord)o;
                    Assert.assertNotNull((Object)event);
                    Assert.assertEquals((Object)event.topic(), (Object)this.envConfig.getEventsTopicName());
                    Assert.assertEquals((long)event.offset(), (long)index.get());
                    RemoteCommand remoteCommand = (RemoteCommand)SerializationUtil.deserialize((byte[])((byte[])event.value()));
                    this.logger.warn("Event {}:{} offset:{}", new Object[]{index.get(), remoteCommand, event.offset()});
                    Assert.assertNotNull((Object)remoteCommand.getId());
                    if (index.get() == 0) {
                        firstEvent.set(event);
                        Assert.assertTrue((boolean)(remoteCommand instanceof FireUntilHaltCommand));
                    }
                    if (index.get() == 1) {
                        Assert.assertTrue((boolean)(remoteCommand instanceof InsertCommand));
                        secondEvent.set(event);
                    }
                    index.incrementAndGet();
                });
                int attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number:{}", (Object)attemptNumber);
                if (attempts.get() != 10) continue;
                throw new RuntimeException("No Events message available after " + attempts + "attempts.");
            }
            this.logger.warn("Checks on Control topic");
            ArrayList messages = new ArrayList();
            attempts.set(0);
            while (messages.size() < 2) {
                ConsumerRecords controlRecords = controlConsumer.poll(Duration.ofSeconds(2L));
                controlRecords.forEach(o -> {
                    ConsumerRecord control = (ConsumerRecord)o;
                    Assert.assertNotNull((Object)control);
                    ControlMessage controlMessage = (ControlMessage)SerializationUtil.deserialize((byte[])((byte[])control.value()));
                    controlMessage.setOffset(control.offset());
                    this.logger.warn("Control message found:{}", (Object)controlMessage);
                    messages.add(controlMessage);
                });
                int attemptNumber = attempts.incrementAndGet();
                this.logger.warn("Attempt number:{}", (Object)attemptNumber);
                if (attempts.get() != 10) continue;
                throw new RuntimeException("No control message available after " + attempts + "attempts.");
            }
            Assert.assertEquals((long)2L, (long)messages.size());
            AtomicReference fireUntilHalt = new AtomicReference();
            AtomicReference insert = new AtomicReference();
            index.set(0);
            messages.forEach(controlMessage -> {
                if (index.get() == 0) {
                    Assert.assertNotNull((Object)controlMessage);
                    fireUntilHalt.set(controlMessage);
                }
                if (index.get() == 1) {
                    Assert.assertNotNull((Object)controlMessage);
                    insert.set(controlMessage);
                }
                index.incrementAndGet();
            });
            Assert.assertEquals((Object)((ControlMessage)fireUntilHalt.get()).getId(), (Object)((ConsumerRecord)firstEvent.get()).key());
            Assert.assertTrue((boolean)((ControlMessage)fireUntilHalt.get()).getSideEffects().isEmpty());
            Assert.assertEquals((Object)((ControlMessage)insert.get()).getId(), (Object)((ConsumerRecord)secondEvent.get()).key());
            Assert.assertTrue((!((ControlMessage)insert.get()).getSideEffects().isEmpty() ? 1 : 0) != 0);
            this.logger.warn("Test ended, going to stop kafka");
        }
        catch (Exception ex) {
            throw new RuntimeException(ex.getMessage(), ex);
        }
        finally {
            eventsConsumer.close();
            this.logger.warn("Event consumer closed");
            controlConsumer.close();
            this.logger.warn("Control consumer closed");
        }
    }
}

