/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.client;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.noear.folkmq.client.MqClient;
import org.noear.folkmq.client.MqClientInternal;
import org.noear.folkmq.client.MqConsumerHandler;
import org.noear.folkmq.client.MqMessageImpl;
import org.noear.folkmq.client.Subscription;
import org.noear.socketd.SocketD;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.BuilderListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqClientImpl
extends BuilderListener
implements MqClientInternal {
    private static final Logger log = LoggerFactory.getLogger(MqClientImpl.class);
    private String serverUrl;
    private Session session;
    private Map<String, MqConsumerHandler> subscribeMap = new HashMap<String, MqConsumerHandler>();
    private boolean autoAck = true;

    public MqClientImpl(String serverUrl) throws Exception {
        this.serverUrl = serverUrl.replace("folkmq://", "sd:tcp://");
        this.session = SocketD.createClient((String)this.serverUrl).listen((Listener)this).open();
        this.on("mq.cmd.distribute", (s, m) -> {
            String topic = m.meta("mq.topic");
            try {
                this.onDistribute(topic, (Message)m);
                if (this.autoAck) {
                    this.acknowledge((Message)m, true);
                }
            }
            catch (Exception e) {
                this.acknowledge((Message)m, false);
            }
        });
    }

    @Override
    public void subscribe(String topic, Subscription subscription) throws IOException {
        this.subscribeMap.put(topic, subscription.getHandler());
        EntityDefault entity = new StringEntity("").meta("mq.topic", topic).meta("mq.identity", subscription.getIdentity());
        this.session.sendAndRequest("mq.cmd.subscribe", (Entity)entity);
    }

    @Override
    public CompletableFuture<?> publish(String topic, String message) throws IOException {
        EntityDefault entity = new StringEntity(message).meta("mq.topic", topic);
        CompletableFuture future = new CompletableFuture();
        this.session.sendAndSubscribe("mq.cmd.publish", (Entity)entity, r -> future.complete(null));
        return future;
    }

    @Override
    public MqClient autoAck(boolean auto) {
        this.autoAck = auto;
        return this;
    }

    @Override
    public void acknowledge(Message message, boolean isOk) throws IOException {
        this.session.replyEnd(message, (Entity)new StringEntity("").meta("mq.ack", isOk ? "1" : "0"));
    }

    private void onDistribute(String topic, Message message) throws IOException {
        MqConsumerHandler handler = this.subscribeMap.get(topic);
        if (handler != null) {
            handler.handle(topic, new MqMessageImpl(this, message));
        }
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            log.warn("{}", error);
        }
    }
}

