/*
 * Decompiled with CFR 0.152.
 */
package org.noear.folkmq.client;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.noear.folkmq.client.IMqMessage;
import org.noear.folkmq.client.MqClient;
import org.noear.folkmq.client.MqClientInternal;
import org.noear.folkmq.client.MqConsumeHandler;
import org.noear.folkmq.client.MqMessageReceivedImpl;
import org.noear.folkmq.client.MqSubscription;
import org.noear.folkmq.exception.FolkmqException;
import org.noear.socketd.SocketD;
import org.noear.socketd.exception.SocketdAlarmException;
import org.noear.socketd.exception.SocketdConnectionException;
import org.noear.socketd.transport.client.Client;
import org.noear.socketd.transport.client.ClientConfigHandler;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.EntityDefault;
import org.noear.socketd.transport.core.entity.StringEntity;
import org.noear.socketd.transport.core.listener.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqClientDefault
extends EventListener
implements MqClientInternal {
    private static final Logger log = LoggerFactory.getLogger(MqClientDefault.class);
    private String serverUrl;
    private Client client;
    private Session clientSession;
    private ClientConfigHandler clientConfigHandler;
    private Map<String, MqSubscription> subscriptionMap = new HashMap<String, MqSubscription>();
    private boolean autoAcknowledge = true;
    private int publishRetryTimes = 0;

    public MqClientDefault(String serverUrl) {
        this.serverUrl = serverUrl.replace("folkmq://", "sd:tcp://");
        this.on("mq.event.distribute", (s, m) -> {
            MqMessageReceivedImpl message = null;
            try {
                message = new MqMessageReceivedImpl(this, (Message)m);
                MqSubscription subscription = this.subscriptionMap.get(message.getTopic());
                if (subscription != null) {
                    subscription.consume(message);
                }
                if (this.autoAcknowledge) {
                    this.acknowledge(message, true);
                }
            }
            catch (Throwable e) {
                if (message != null) {
                    this.acknowledge(message, false);
                    log.warn("Client consumer handle error, tid={}", (Object)message.getTid(), (Object)e);
                }
                log.warn("Client consumer handle error", e);
            }
        });
    }

    @Override
    public MqClient connect() throws IOException {
        this.client = SocketD.createClient((String)this.serverUrl);
        if (this.clientConfigHandler != null) {
            this.client.config(this.clientConfigHandler);
        }
        this.clientSession = this.client.listen((Listener)this).open();
        return this;
    }

    @Override
    public void disconnect() throws IOException {
        if (this.clientSession != null) {
            this.clientSession.close();
        }
    }

    @Override
    public MqClient config(ClientConfigHandler configHandler) {
        this.clientConfigHandler = configHandler;
        return this;
    }

    @Override
    public MqClient autoAcknowledge(boolean auto) {
        this.autoAcknowledge = auto;
        return this;
    }

    @Override
    public MqClient publishRetryTimes(int times) {
        this.publishRetryTimes = times;
        return this;
    }

    @Override
    public void subscribe(String topic, String consumer, MqConsumeHandler consumerHandler) throws IOException {
        MqSubscription subscription = new MqSubscription(topic, consumer, consumerHandler);
        this.subscriptionMap.put(topic, subscription);
        if (this.clientSession != null && this.clientSession.isValid()) {
            EntityDefault entity = new StringEntity("").meta("mq.topic", subscription.getTopic()).meta("mq.consumer", subscription.getConsumer()).at("folkmq-server*");
            this.clientSession.sendAndRequest("mq.event.subscribe", (Entity)entity);
            log.info("Client subscribe successfully: {}#{}", (Object)topic, (Object)consumer);
        }
    }

    @Override
    public void unsubscribe(String topic, String consumer) throws IOException {
        this.subscriptionMap.remove(topic);
        if (this.clientSession != null && this.clientSession.isValid()) {
            EntityDefault entity = new StringEntity("").meta("mq.topic", topic).meta("mq.consumer", consumer).at("folkmq-server*");
            this.clientSession.sendAndRequest("mq.event.unsubscribe", (Entity)entity);
            log.info("Client unsubscribe successfully: {}#{}", (Object)topic, (Object)consumer);
        }
    }

    @Override
    public void publish(String topic, IMqMessage message) throws IOException {
        if (this.clientSession == null) {
            throw new SocketdConnectionException("Not connected!");
        }
        Entity entity = this.publishEntityBuild(topic, message);
        if (message.getQos() > 0) {
            if (this.publishRetryTimes > 0) {
                int times = this.publishRetryTimes;
                while (times > 0) {
                    try {
                        Entity resp = this.clientSession.sendAndRequest("mq.event.publish", entity);
                        int confirm = Integer.parseInt(resp.metaOrDefault("mq.confirm", "0"));
                        if (confirm != 1) {
                            String messsage = "Client message publish confirm failed: " + resp.dataAsString();
                            throw new FolkmqException(messsage);
                        }
                        break;
                    }
                    catch (Throwable e) {
                        if (--times != 0) continue;
                        throw e;
                    }
                }
            }
        } else {
            this.clientSession.send("mq.event.publish", entity);
        }
    }

    @Override
    public CompletableFuture<Boolean> publishAsync(String topic, IMqMessage message) throws IOException {
        if (this.clientSession == null) {
            throw new SocketdConnectionException("Not connected!");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        Entity entity = this.publishEntityBuild(topic, message);
        if (message.getQos() > 0) {
            this.clientSession.sendAndRequest("mq.event.publish", entity, r -> {
                int confirm = Integer.parseInt(r.metaOrDefault("mq.confirm", "0"));
                if (confirm == 1) {
                    future.complete(true);
                } else {
                    String messsage = "Client message publish confirm failed: " + r.dataAsString();
                    future.completeExceptionally(new FolkmqException(messsage));
                }
            });
        } else {
            this.clientSession.send("mq.event.publish", entity);
            future.complete(true);
        }
        return future;
    }

    private Entity publishEntityBuild(String topic, IMqMessage message) {
        StringEntity entity = new StringEntity(message.getContent());
        entity.meta("mq.tid", message.getTid());
        entity.meta("mq.topic", topic);
        entity.meta("mq.qos", message.getQos() == 0 ? "0" : "1");
        if (message.getScheduled() == null) {
            entity.meta("mq.scheduled", "0");
        } else {
            entity.meta("mq.scheduled", String.valueOf(message.getScheduled().getTime()));
        }
        entity.at("folkmq-server");
        return entity;
    }

    @Override
    public void acknowledge(MqMessageReceivedImpl message, boolean isOk) throws IOException {
        if (message.getQos() > 0) {
            this.clientSession.replyEnd(message.from, (Entity)new StringEntity("").meta("mq.ack", isOk ? "1" : "0"));
        }
    }

    public void onOpen(Session session) throws IOException {
        super.onOpen(session);
        log.info("Client session opened, sessionId={}", (Object)session.sessionId());
        for (MqSubscription subscription : this.subscriptionMap.values()) {
            EntityDefault entity = new StringEntity("").meta("mq.topic", subscription.getTopic()).meta("mq.consumer", subscription.getConsumer()).at("folkmq-server");
            session.send("mq.event.subscribe", (Entity)entity);
        }
    }

    public void onClose(Session session) {
        super.onClose(session);
        log.info("Client session closed, sessionId={}", (Object)session.sessionId());
    }

    public void onError(Session session, Throwable error) {
        super.onError(session, error);
        if (log.isWarnEnabled()) {
            if (error instanceof SocketdAlarmException) {
                SocketdAlarmException alarmException = (SocketdAlarmException)error;
                log.warn("Client error, sessionId={}, from={}", new Object[]{session.sessionId(), alarmException.getFrom(), error});
            } else {
                log.warn("Client error, sessionId={}", (Object)session.sessionId(), (Object)error);
            }
        }
    }
}

