/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.client;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.noear.folkmq.client.MqClient;
import org.noear.folkmq.client.MqClientInternal;
import org.noear.folkmq.client.MqConsumerHandler;
import org.noear.folkmq.client.MqMessageImpl;
import org.noear.folkmq.client.MqSubscription;
import org.noear.socketd.SocketD;
import org.noear.socketd.exception.SocketdConnectionException;
import org.noear.socketd.transport.client.Client;
import org.noear.socketd.transport.client.ClientConfigHandler;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.BuilderListener;
import org.noear.socketd.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqClientImpl
extends BuilderListener
implements MqClientInternal {
    private static final Logger log = LoggerFactory.getLogger(MqClientImpl.class);
    private String serverUrl;
    private Session clientSession;
    private ClientConfigHandler clientConfigHandler;
    private Map<String, MqSubscription> subscriptionMap = new HashMap<String, MqSubscription>();
    private boolean autoAcknowledge = true;

    public MqClientImpl(String serverUrl) throws Exception {
        this.serverUrl = serverUrl.replace("folkmq://", "sd:tcp://");
        this.on("mq.cmd.distribute", (s, m) -> {
            try {
                String topic = m.meta("mq.topic");
                MqSubscription subscription = this.subscriptionMap.get(topic);
                if (subscription != null) {
                    subscription.handle(new MqMessageImpl(this, (Message)m));
                }
                if (this.autoAcknowledge) {
                    this.acknowledge((Message)m, true);
                }
            }
            catch (Throwable e) {
                this.acknowledge((Message)m, false);
            }
        });
    }

    @Override
    public MqClient config(ClientConfigHandler configHandler) {
        this.clientConfigHandler = configHandler;
        return this;
    }

    @Override
    public MqClient connect() throws Exception {
        Client client = SocketD.createClient((String)this.serverUrl);
        if (this.clientConfigHandler != null) {
            client.config(this.clientConfigHandler);
        }
        this.clientSession = client.config(c -> c.heartbeatInterval(5000L)).listen((Listener)this).open();
        return this;
    }

    @Override
    public void disconnect() throws IOException {
        if (this.clientSession != null) {
            this.clientSession.close();
        }
    }

    @Override
    public MqClient autoAcknowledge(boolean auto) {
        this.autoAcknowledge = auto;
        return this;
    }

    @Override
    public void subscribe(String topic, String consumer, MqConsumerHandler consumerHandler) throws IOException {
        MqSubscription subscription = new MqSubscription(topic, consumer, consumerHandler);
        this.subscriptionMap.put(topic, subscription);
        if (this.clientSession != null && this.clientSession.isValid()) {
            EntityDefault entity = new StringEntity("").meta("mq.topic", subscription.getTopic()).meta("mq.consumer", subscription.getConsumer());
            this.clientSession.sendAndRequest("mq.cmd.subscribe", (Entity)entity);
        }
    }

    @Override
    public CompletableFuture<?> publish(String topic, String content) throws IOException {
        return this.publish(topic, content, null);
    }

    @Override
    public CompletableFuture<?> publish(String topic, String content, Date scheduled) throws IOException {
        if (this.clientSession == null) {
            throw new SocketdConnectionException("Not connected!");
        }
        StringEntity entity = new StringEntity(content);
        entity.meta("mq.tid", Utils.guid());
        entity.meta("mq.topic", topic);
        if (scheduled == null) {
            entity.meta("mq.scheduled", "0");
        } else {
            entity.meta("mq.scheduled", String.valueOf(scheduled.getTime()));
        }
        CompletableFuture future = new CompletableFuture();
        this.clientSession.sendAndSubscribe("mq.cmd.publish", (Entity)entity, r -> future.complete(null));
        return future;
    }

    @Override
    public void acknowledge(Message message, boolean isOk) throws IOException {
        this.clientSession.replyEnd(message, (Entity)new StringEntity("").meta("mq.ack", isOk ? "1" : "0"));
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        log.info("Client session opened, session={}", (Object)session.sessionId());
        for (MqSubscription subscription : this.subscriptionMap.values()) {
            EntityDefault entity = new StringEntity("").meta("mq.topic", subscription.getTopic()).meta("mq.consumer", subscription.getConsumer());
            session.send("mq.cmd.subscribe", (Entity)entity);
        }
    }

    public void onClose(Session session) {
        super.onClose(session);
        log.info("Client session closed, session={}", (Object)session.sessionId());
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            log.warn("Client error, session={}", (Object)session.sessionId(), (Object)error);
        }
    }
}

