/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.broker.store.fdb;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.noear.folkmq.broker.MqBorkerInternal;
import org.noear.folkmq.broker.MqDraft;
import org.noear.folkmq.broker.MqMessageHolder;
import org.noear.folkmq.broker.MqQueue;
import org.noear.folkmq.broker.MqQueueDefault;
import org.noear.folkmq.broker.MqStoreBase;
import org.noear.folkmq.common.MqMetasResolver;
import org.noear.folkmq.common.MqUtils;
import org.noear.folkmq.utils.IoUtils;
import org.noear.snack.ONode;
import org.noear.snack.core.Feature;
import org.noear.snack.core.Options;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.MessageInternal;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.MessageBuilder;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqSnapshotStore
extends MqStoreBase {
    protected static final Logger log = LoggerFactory.getLogger(MqSnapshotStore.class);
    private static final String file_suffix = ".fdb";
    private MqBorkerInternal serverRef;
    private final File directory;
    private final AtomicBoolean inSaveProcess = new AtomicBoolean(false);
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    public MqSnapshotStore() {
        this(null);
    }

    public MqSnapshotStore(String dataPath) {
        if (StrUtils.isEmpty((String)dataPath)) {
            dataPath = "./data/fdb/";
        }
        this.directory = new File(dataPath);
        if (!this.directory.exists()) {
            this.directory.mkdirs();
        }
    }

    private void saveCommit(File fileTmp, String fileName) throws IOException {
        File file;
        File fileBak = new File(this.directory, fileName + ".bak");
        if (fileBak.exists()) {
            fileBak.delete();
        }
        if ((file = new File(this.directory, fileName)).exists()) {
            file.renameTo(fileBak);
        }
        fileTmp.renameTo(file);
    }

    public String getName() {
        return "fdb";
    }

    public void init(MqBorkerInternal serverInternal) {
        this.serverRef = serverInternal;
    }

    public void onStartBefore() {
        this.isStarted.set(false);
        try {
            this.loadSubscribeMap();
            this.loadQueue();
        }
        finally {
            this.isStarted.set(true);
        }
    }

    public void onStartAfter() {
    }

    private void loadSubscribeMap() {
        File subscribeMapFile = new File(this.directory, "subscribe-map.fdb");
        if (!subscribeMapFile.exists()) {
            return;
        }
        try {
            if (!this.loadSubscribeMapNewDo(subscribeMapFile)) {
                this.loadSubscribeMapOldDo(subscribeMapFile);
            }
            log.info("Server persistent load subscribeMap completed");
        }
        catch (Exception e) {
            log.warn("Server persistent load subscribeMap failed", (Throwable)e);
        }
    }

    private void loadSubscribeMapOldDo(File subscribeMapFile) throws Exception {
        String subscribeMapJsonStr = MqSnapshotStore.readSnapshotFile(subscribeMapFile);
        ONode subscribeMapJson = ONode.loadStr((String)subscribeMapJsonStr, (Feature[])new Feature[]{Feature.DisThreadLocal});
        for (String topic : subscribeMapJson.obj().keySet()) {
            ONode oQueueNameList = subscribeMapJson.get(topic);
            for (ONode oQueueName : oQueueNameList.ary()) {
                String consumerGroup = oQueueName.getString().split("#")[1];
                this.serverRef.subscribeDo(topic, consumerGroup, null);
            }
        }
    }

    /*
     * Unable to fully structure code
     */
    private boolean loadSubscribeMapNewDo(File subscribeMapFile) throws Exception {
        isNewStyle = false;
        reader = new BufferedReader(new FileReader(subscribeMapFile));
        var4_4 = null;
        try {
            block12: while (true) {
                if ((topicJsonStr = reader.readLine()) == null) {
                    break;
                }
                if (topicJsonStr.equals("{")) {
                    if (isNewStyle) continue;
                    var6_9 = false;
                    return var6_9;
                }
                isNewStyle = true;
                if (!topicJsonStr.endsWith("}")) continue;
                oNode = ONode.loadStr((String)topicJsonStr, (Feature[])new Feature[]{Feature.DisThreadLocal});
                topic = oNode.get("topic").getString();
                oQueueNameList = oNode.get("queues");
                var9_13 = oQueueNameList.ary().iterator();
                while (true) {
                    if (var9_13.hasNext()) ** break;
                    continue block12;
                    oQueueName = (ONode)var9_13.next();
                    consumerGroup = oQueueName.getString().split("#")[1];
                    this.serverRef.subscribeDo(topic, consumerGroup, null);
                }
                break;
            }
        }
        catch (Throwable var5_7) {
            var4_4 = var5_7;
            throw var5_7;
        }
        finally {
            if (reader != null) {
                if (var4_4 != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable var5_6) {
                        var4_4.addSuppressed(var5_6);
                    }
                } else {
                    reader.close();
                }
            }
        }
        return true;
    }

    private void loadQueue() {
        Map subscribeMap = this.serverRef.getSubscribeMap();
        if (subscribeMap.size() == 0) {
            return;
        }
        ArrayList topicList = new ArrayList(subscribeMap.keySet());
        HashSet queueNameSet = new HashSet();
        for (String topic : topicList) {
            Set tmp = (Set)subscribeMap.get(topic);
            if (tmp == null) continue;
            queueNameSet.addAll(tmp);
        }
        for (String queueName : queueNameSet) {
            try {
                this.loadQueue1(queueName);
                log.info("Server persistent load messageQueue completed, queueName={}", (Object)queueName);
            }
            catch (Exception e) {
                log.warn("Server persistent load messageQueue failed, queueName={}", (Object)queueName, (Object)e);
            }
        }
    }

    private boolean loadQueue1(String queueName) throws IOException {
        String queueFileName = queueName.replace("#", "/") + file_suffix;
        File queueFile = new File(this.directory, queueFileName);
        if (!queueFile.exists()) {
            return false;
        }
        try (BufferedReader reader = new BufferedReader(new FileReader(queueFile));){
            String messageJsonStr;
            while ((messageJsonStr = reader.readLine()) != null) {
                if (!messageJsonStr.endsWith("}")) continue;
                ONode messageJson = ONode.loadStr((String)messageJsonStr, (Feature[])new Feature[]{Feature.DisThreadLocal});
                int ver = messageJson.get("v").getInt();
                String metaString = messageJson.get("meta").getString();
                String data = messageJson.get("data").getString();
                if (data == null) continue;
                EntityDefault entity = new EntityDefault();
                if (ver < 2) {
                    entity.dataSet(data.getBytes(StandardCharsets.UTF_8));
                } else {
                    entity.dataSet(Base64.getDecoder().decode(data));
                }
                entity.metaStringSet(metaString);
                MessageInternal message = new MessageBuilder().sid(StrUtils.guid()).flag(40).entity((Entity)entity).build();
                MqMetasResolver mr = MqUtils.getOf((Entity)message);
                MqDraft draft = new MqDraft(mr, (Message)message);
                MqQueue queue = this.serverRef.getQueue(queueName);
                this.serverRef.routingToQueueDo(draft, queue, 0L);
            }
        }
        return true;
    }

    public void onStopAfter() {
        this.saveDo();
        this.isStarted.set(false);
    }

    public void onSave() {
        this.saveDo();
    }

    private void saveDo() {
        if (!this.isStarted.get()) {
            return;
        }
        if (this.inSaveProcess.get()) {
            return;
        }
        this.inSaveProcess.set(true);
        try {
            this.saveSubscribeMap();
            this.saveQueue();
        }
        finally {
            this.inSaveProcess.set(false);
        }
    }

    private void saveSubscribeMap() {
        try {
            this.saveSubscribeMapDo();
            log.info("Server persistent saveSubscribeMap completed");
        }
        catch (Exception e) {
            log.warn("Server persistent saveSubscribeMap failed");
        }
    }

    private void saveSubscribeMapDo() throws Exception {
        Map subscribeMap = this.serverRef.getSubscribeMap();
        if (subscribeMap.size() == 0) {
            return;
        }
        String subscribeMapFileName = "subscribe-map.fdb";
        String subscribeMapFileNameTmp = subscribeMapFileName + ".tmp";
        File subscribeMapFileTmp = new File(this.directory, subscribeMapFileNameTmp);
        if (!subscribeMapFileTmp.exists()) {
            subscribeMapFileTmp.createNewFile();
        }
        Iterator subscribeIterator = subscribeMap.entrySet().iterator();
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(subscribeMapFileTmp));){
            while (subscribeIterator.hasNext()) {
                Map.Entry kv = subscribeIterator.next();
                String topic = (String)kv.getKey();
                Set topicConsumerList = (Set)kv.getValue();
                ONode topicJson = new ONode(Options.def().add(new Feature[]{Feature.DisThreadLocal}));
                topicJson.set("topic", (Object)topic);
                topicJson.getOrNew("queues").addAll((Collection)topicConsumerList);
                writer.write(topicJson.toJson());
                writer.newLine();
            }
        }
        this.saveCommit(subscribeMapFileTmp, subscribeMapFileName);
    }

    private void saveQueue() {
        Map subscribeMap = this.serverRef.getSubscribeMap();
        if (subscribeMap.size() == 0) {
            return;
        }
        Iterator subscribeIterator = subscribeMap.entrySet().iterator();
        HashSet queueNameSet = new HashSet();
        while (subscribeIterator.hasNext()) {
            Map.Entry kv = subscribeIterator.next();
            if (kv.getValue() == null) continue;
            queueNameSet.addAll((Collection)kv.getValue());
        }
        for (Map.Entry kv : this.serverRef.getQueueMap().entrySet()) {
            String queueName = (String)kv.getKey();
            MqQueue queue = (MqQueue)kv.getValue();
            try {
                this.saveQueue1(queueName, (MqQueueDefault)queue);
                log.info("Server persistent messageQueue completed, queueName={}", (Object)queueName);
            }
            catch (Exception e) {
                log.warn("Server persistent messageQueue failed, queueName={}", (Object)queueName, (Object)e);
            }
        }
        log.info("Server persistent saveQueue completed");
    }

    private void saveQueue1(String queueName, MqQueueDefault queue) throws IOException {
        String queueFileName;
        String queueFileNameTmp;
        File queueFileTmp;
        String[] topicConsumerGroupAry = queueName.split("#");
        File topicDir = new File(this.directory, topicConsumerGroupAry[0]);
        if (!topicDir.exists()) {
            topicDir.mkdirs();
        }
        if (!(queueFileTmp = new File(this.directory, queueFileNameTmp = (queueFileName = topicConsumerGroupAry[0] + "/" + topicConsumerGroupAry[1] + file_suffix) + ".tmp")).exists()) {
            queueFileTmp.createNewFile();
        }
        if (queue != null) {
            block20: {
                Iterator messageIterator = queue.getMessageMap().entrySet().iterator();
                BufferedWriter writer = new BufferedWriter(new FileWriter(queueFileTmp));
                Throwable throwable = null;
                block11: while (true) {
                    try {
                        while (messageIterator.hasNext()) {
                            Map.Entry kv = messageIterator.next();
                            MqMessageHolder messageHolder = (MqMessageHolder)kv.getValue();
                            if (messageHolder.isDone()) continue;
                            try {
                                EntityDefault entity = messageHolder.getEntity();
                                ONode entityJson = new ONode(Options.def().add(new Feature[]{Feature.DisThreadLocal}));
                                entityJson.set("v", (Object)2);
                                entityJson.set("meta", (Object)entity.metaString());
                                entityJson.set("data", (Object)Base64.getEncoder().encodeToString(entity.dataAsBytes()));
                                writer.write(entityJson.toJson());
                                writer.newLine();
                                continue block11;
                            }
                            catch (Exception e) {
                                log.warn("Server persistent message failed, key={}", (Object)messageHolder.getKey(), (Object)e);
                            }
                        }
                        break block20;
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                }
                finally {
                    if (writer != null) {
                        if (throwable != null) {
                            try {
                                writer.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                        } else {
                            writer.close();
                        }
                    }
                }
            }
            this.saveCommit(queueFileTmp, queueFileName);
        } else {
            MqSnapshotStore.saveSnapshotFile(queueFileTmp, "");
            this.saveCommit(queueFileTmp, queueFileName);
        }
    }

    private static String readSnapshotFile(File file) throws IOException {
        try (FileInputStream input = new FileInputStream(file);){
            byte[] bytes;
            byte[] contentBytes = bytes = IoUtils.transferToBytes((InputStream)input);
            String string = new String(contentBytes, StandardCharsets.UTF_8);
            return string;
        }
    }

    private static void saveSnapshotFile(File file, String content) throws IOException {
        byte[] contentBytes;
        byte[] bytes = contentBytes = content.getBytes(StandardCharsets.UTF_8);
        try (ByteArrayInputStream input = new ByteArrayInputStream(bytes);
             FileOutputStream out = new FileOutputStream(file);){
            IoUtils.transferTo((InputStream)input, (OutputStream)out);
        }
    }
}

