/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.persistence.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.infinispan.protostream.BaseMarshaller;
import org.kie.api.marshalling.ObjectMarshallingStrategy;
import org.kie.kogito.persistence.kafka.KafkaPersistenceUtils;
import org.kie.kogito.persistence.protobuf.ProtoStreamObjectMarshallingStrategy;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceDuplicatedException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.process.impl.marshalling.ProcessInstanceMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProcessInstances
implements MutableProcessInstances {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessInstances.class);
    private Process<?> process;
    private KafkaProducer<String, byte[]> producer;
    private String topic;
    private ReadOnlyKeyValueStore<String, byte[]> store;
    private ProcessInstanceMarshaller marshaller;
    private CountDownLatch latch = new CountDownLatch(1);

    public KafkaProcessInstances(Process<?> process, KafkaProducer<String, byte[]> producer, String proto, BaseMarshaller<?> ... marshallers) {
        this.process = process;
        this.topic = KafkaPersistenceUtils.topicName(process.id());
        this.producer = producer;
        this.setMarshaller(new ProcessInstanceMarshaller(new ObjectMarshallingStrategy[]{new ProtoStreamObjectMarshallingStrategy(proto, marshallers)}));
    }

    protected Process<?> getProcess() {
        return this.process;
    }

    protected ReadOnlyKeyValueStore<String, byte[]> getStore() {
        if (this.store != null) {
            return this.store;
        }
        return this.getStoreAwait();
    }

    protected void setStore(ReadOnlyKeyValueStore<String, byte[]> store) {
        this.store = store;
        this.latch.countDown();
    }

    private ReadOnlyKeyValueStore<String, byte[]> getStoreAwait() {
        try {
            if (this.latch.await(1L, TimeUnit.MINUTES)) {
                if (this.store == null) {
                    throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id());
                }
                return this.store;
            }
            throw new RuntimeException("Timeout waiting to obtain Kafka Store for process: " + this.process.id());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id(), e);
        }
    }

    protected void setMarshaller(ProcessInstanceMarshaller marshaller) {
        this.marshaller = marshaller;
    }

    public boolean exists(String id) {
        return this.getStore().get((Object)id) != null;
    }

    public void create(String id, ProcessInstance instance) {
        if (this.isActive(instance)) {
            if (this.getStore().get((Object)id) != null) {
                throw new ProcessInstanceDuplicatedException(id);
            }
            byte[] data = this.marshaller.marshallProcessInstance(instance);
            try {
                this.producer.send(new ProducerRecord(this.topic, (Object)id, (Object)data)).get();
                this.disconnect(instance);
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to persist process instance id: " + id, e);
            }
        }
    }

    public void update(String id, ProcessInstance instance) {
        if (this.isActive(instance)) {
            byte[] data = this.marshaller.marshallProcessInstance(instance);
            try {
                this.producer.send(new ProducerRecord(this.topic, (Object)id, (Object)data)).get();
                this.disconnect(instance);
            }
            catch (Exception e) {
                throw new RuntimeException("Unable to update process instance id: " + id, e);
            }
        }
    }

    public void remove(String id) {
        try {
            this.producer.send(new ProducerRecord(this.topic, (Object)id, null)).get();
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to remove process instance id: " + id, e);
        }
    }

    public Optional<ProcessInstance> findById(String id, ProcessInstanceReadMode mode) {
        byte[] data = (byte[])this.getStore().get((Object)id);
        if (data == null) {
            return Optional.empty();
        }
        return Optional.of(mode == ProcessInstanceReadMode.MUTABLE ? this.marshaller.unmarshallProcessInstance(data, this.process) : this.marshaller.unmarshallReadOnlyProcessInstance(data, this.process));
    }

    public Collection<ProcessInstance> values(ProcessInstanceReadMode mode) {
        ArrayList<ProcessInstance> arrayList;
        block9: {
            ArrayList<ProcessInstance> instances = new ArrayList<ProcessInstance>();
            KeyValueIterator iterator = this.getStore().all();
            try {
                while (iterator.hasNext()) {
                    instances.add(mode == ProcessInstanceReadMode.MUTABLE ? this.marshaller.unmarshallProcessInstance((byte[])((KeyValue)iterator.next()).value, this.process) : this.marshaller.unmarshallReadOnlyProcessInstance((byte[])((KeyValue)iterator.next()).value, this.process));
                }
                arrayList = instances;
                if (iterator == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (iterator != null) {
                        try {
                            iterator.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new RuntimeException("Unable to read process instances ", e);
                }
            }
            iterator.close();
        }
        return arrayList;
    }

    public Integer size() {
        return (int)this.getStore().approximateNumEntries();
    }

    protected void disconnect(ProcessInstance instance) {
        ((AbstractProcessInstance)instance).internalRemoveProcessInstance(() -> {
            try {
                byte[] reloaded = (byte[])this.getStore().get((Object)instance.id());
                return this.marshaller.unmarshallWorkflowProcessInstance(reloaded, this.process);
            }
            catch (RuntimeException e) {
                LOGGER.error("Unexpected exception thrown when reloading process instance {}", (Object)instance.id(), (Object)e);
                return null;
            }
        });
    }
}

