/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker.store.leveldb;

import com.github.artbits.quickio.api.Collection;
import com.github.artbits.quickio.api.JDB;
import com.github.artbits.quickio.core.Config;
import com.github.artbits.quickio.core.IOEntity;
import com.github.artbits.quickio.core.QuickIO;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqDraft;
import org.noear.folkmq.broker.MqMessageHolder;
import org.noear.folkmq.broker.MqQueue;
import org.noear.folkmq.broker.MqStoreBase;
import org.noear.folkmq.broker.store.leveldb.MessageDoc;
import org.noear.folkmq.broker.store.leveldb.SubscribeDoc;
import org.noear.folkmq.common.MqMetasResolver;
import org.noear.folkmq.common.MqUtils;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.MessageInternal;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.MessageBuilder;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqLevelDbStore
extends MqStoreBase {
    protected static final Logger log = LoggerFactory.getLogger(MqLevelDbStore.class);
    private MqBorkerInternal serverRef;
    private JDB db;
    private Collection<SubscribeDoc> subscribeDocColl;
    private Collection<MessageDoc> messageDocColl;
    private final AtomicBoolean inSaveProcess = new AtomicBoolean(false);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    public MqLevelDbStore() {
        this(null);
    }

    public MqLevelDbStore(String dataPath) {
        if (StrUtils.isEmpty((String)dataPath)) {
            dataPath = "data/ldb/";
        }
        String dataPath2 = dataPath;
        this.db = QuickIO.db((Config)Config.of(c -> c.path(dataPath2).name("folkmq").cache(Long.valueOf(10010001000L))));
        this.subscribeDocColl = this.db.collection(SubscribeDoc.class);
        this.messageDocColl = this.db.collection(MessageDoc.class);
    }

    public String getName() {
        return "leveldb";
    }

    public void init(MqBorkerInternal serverInternal) {
        this.serverRef = serverInternal;
    }

    public void onStartBefore() {
        this.isStarted.set(false);
        try {
            this.loadSubscribeMap();
            this.loadQueue();
        }
        finally {
            this.isStarted.set(true);
        }
    }

    private void loadSubscribeMap() {
        try {
            for (SubscribeDoc subs : this.subscribeDocColl.findAll()) {
                String consumerGroup = subs.queueName.split("#")[1];
                this.serverRef.subscribeDo(subs.topic, consumerGroup, null);
            }
            log.info("Server persistent load subscribeMap completed");
        }
        catch (Exception e) {
            log.warn("Server persistent load subscribeMap failed", (Throwable)e);
        }
    }

    private void loadQueue() {
        Map subscribeMap = this.serverRef.getSubscribeMap();
        if (subscribeMap.size() == 0) {
            return;
        }
        ArrayList topicList = new ArrayList(subscribeMap.keySet());
        HashSet queueNameSet = new HashSet();
        for (String topic : topicList) {
            Set tmp = (Set)subscribeMap.get(topic);
            if (tmp == null) continue;
            queueNameSet.addAll(tmp);
        }
        for (String queueName : queueNameSet) {
            try {
                this.loadQueue1(queueName);
                log.info("Server persistent load messageQueue completed, queueName={}", (Object)queueName);
            }
            catch (Exception e) {
                log.warn("Server persistent load messageQueue failed, queueName={}", (Object)queueName, (Object)e);
            }
        }
    }

    private boolean loadQueue1(String queueName) throws IOException {
        List msgList = this.messageDocColl.find(m -> m.queueName.equals(queueName));
        for (MessageDoc msg : msgList) {
            if (msg.data == null) continue;
            EntityDefault entity = new EntityDefault();
            entity.dataSet(msg.data.getBytes(StandardCharsets.UTF_8));
            entity.metaStringSet(msg.metaString);
            MessageInternal message = new MessageBuilder().sid(StrUtils.guid()).flag(40).entity((Entity)entity).build();
            MqMetasResolver mr = MqUtils.getOf((Entity)message);
            MqDraft draft = new MqDraft(mr, (Message)message);
            MqQueue queue = this.serverRef.getQueue(queueName);
            this.serverRef.routingToQueueDo(draft, queue, msg.objectId());
        }
        return true;
    }

    public void onStopAfter() {
        this.isStarted.set(false);
    }

    public void onSubscribe(String topic, String consumerGroup, Session session) {
        SubscribeDoc doc = new SubscribeDoc();
        doc.topic = topic;
        doc.queueName = topic + "#" + consumerGroup;
        this.subscribeDocColl.save((IOEntity)doc);
    }

    public void onRouting(MqMessageHolder messageHolder) {
        if (messageHolder.getId() > 0L) {
            return;
        }
        MessageDoc doc = new MessageDoc();
        doc.ver = 2;
        doc.queueName = messageHolder.getQueueName();
        doc.metaString = messageHolder.getEntity().metaString();
        doc.data = messageHolder.getEntity().dataAsString();
        this.messageDocColl.save((IOEntity)doc);
        messageHolder.setId(doc.objectId());
    }

    public void onDistribute(String topic, String consumerGroup, MqMessageHolder messageHolder) {
    }

    public void onAcknowledge(String topic, String consumerGroup, MqMessageHolder messageHolder, boolean isOk) {
        MessageDoc msg;
        if (!isOk && (msg = (MessageDoc)this.messageDocColl.findOne(messageHolder.getId())) != null) {
            msg.metaString = messageHolder.getEntity().metaString();
            this.messageDocColl.save((IOEntity)msg);
        }
    }

    public void onRemove(String topic, String consumerGroup, MqMessageHolder messageHolder) {
        if (messageHolder.getId() > 0L) {
            this.messageDocColl.delete(messageHolder.getId());
        }
    }
}

