package com.github.netty.protocol.mqtt;

import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.MqttSession;
import com.github.netty.protocol.mqtt.exception.MqttSessionCorruptedException;
import com.github.netty.protocol.mqtt.subscriptions.ISubscriptionsDirectory;
import com.github.netty.protocol.mqtt.subscriptions.Subscription;
import com.github.netty.protocol.mqtt.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/netty/protocol/mqtt/MqttSessionRegistry.class */
public class MqttSessionRegistry {
    private static final LoggerX LOG = LoggerFactoryX.getLogger(MqttSessionRegistry.class);
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final IQueueRepository queueRepository;
    private final ConcurrentMap<String, MqttSession> pool = new ConcurrentHashMap();
    private final ConcurrentMap<String, Queue<EnqueuedMessage>> queues = new ConcurrentHashMap();

    /* loaded from: input_file:com/github/netty/protocol/mqtt/MqttSessionRegistry$EnqueuedMessage.class */
    public static abstract class EnqueuedMessage {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/netty/protocol/mqtt/MqttSessionRegistry$PostConnectAction.class */
    public enum PostConnectAction {
        NONE,
        SEND_STORED_MESSAGES
    }

    /* loaded from: input_file:com/github/netty/protocol/mqtt/MqttSessionRegistry$PubRelMarker.class */
    static final class PubRelMarker extends EnqueuedMessage {
    }

    /* loaded from: input_file:com/github/netty/protocol/mqtt/MqttSessionRegistry$PublishedMessage.class */
    static class PublishedMessage extends EnqueuedMessage {
        final Topic topic;
        final MqttQoS publishingQos;
        final ByteBuf payload;

        /* JADX INFO: Access modifiers changed from: package-private */
        public PublishedMessage(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
            this.topic = topic;
            this.publishingQos = mqttQoS;
            this.payload = byteBuf;
        }
    }

    public MqttSessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, IQueueRepository iQueueRepository) {
        this.subscriptionsDirectory = iSubscriptionsDirectory;
        this.queueRepository = iQueueRepository;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void bindToSession(MqttConnection mqttConnection, MqttConnectMessage mqttConnectMessage, String str) {
        boolean z = false;
        PostConnectAction postConnectAction = PostConnectAction.NONE;
        if (this.pool.containsKey(str)) {
            postConnectAction = bindToExistingSession(mqttConnection, mqttConnectMessage, str, createNewSession(mqttConnection, mqttConnectMessage, str));
            z = true;
        } else {
            MqttSession createNewSession = createNewSession(mqttConnection, mqttConnectMessage, str);
            if (this.pool.putIfAbsent(str, createNewSession) == null) {
                LOG.trace("case 1, not existing session with CId {}", str);
            } else {
                postConnectAction = bindToExistingSession(mqttConnection, mqttConnectMessage, str, createNewSession);
                z = true;
            }
        }
        mqttConnection.sendConnAck(!mqttConnectMessage.variableHeader().isCleanSession() && z);
        if (postConnectAction == PostConnectAction.SEND_STORED_MESSAGES) {
            this.pool.get(str).sendQueuedMessagesWhileOffline();
        }
    }

    Collection<MqttClientDescriptor> listConnectedClients() {
        return (Collection) this.pool.values().stream().filter((v0) -> {
            return v0.connected();
        }).map(this::createClientDescriptor).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    private Optional<MqttClientDescriptor> createClientDescriptor(MqttSession mqttSession) {
        String clientID = mqttSession.getClientID();
        return mqttSession.remoteAddress().map(inetSocketAddress -> {
            return new MqttClientDescriptor(clientID, inetSocketAddress.getHostString(), inetSocketAddress.getPort());
        });
    }

    private PostConnectAction bindToExistingSession(MqttConnection mqttConnection, MqttConnectMessage mqttConnectMessage, String str, MqttSession mqttSession) {
        PostConnectAction postConnectAction = PostConnectAction.NONE;
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        MqttSession mqttSession2 = this.pool.get(str);
        if (isCleanSession && mqttSession2.disconnected()) {
            dropQueuesForClient(str);
            unsubscribe(mqttSession2);
            if (!mqttSession2.assignState(MqttSession.SessionStatus.DISCONNECTED, MqttSession.SessionStatus.CONNECTING)) {
                throw new MqttSessionCorruptedException("old session was already changed state");
            }
            copySessionConfig(mqttConnectMessage, mqttSession2);
            mqttSession2.bind(mqttConnection);
            if (!mqttSession2.assignState(MqttSession.SessionStatus.CONNECTING, MqttSession.SessionStatus.CONNECTED)) {
                throw new MqttSessionCorruptedException("old session moved in connected state by other thread");
            }
            if (!this.pool.replace(str, mqttSession2, mqttSession2)) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
            LOG.trace("case 2, oldSession with same CId {} disconnected", str);
        } else if (!isCleanSession && mqttSession2.disconnected()) {
            reactivateSubscriptions(mqttSession2);
            if (!mqttSession2.assignState(MqttSession.SessionStatus.DISCONNECTED, MqttSession.SessionStatus.CONNECTING)) {
                throw new MqttSessionCorruptedException("old session moved in connected state by other thread");
            }
            mqttSession2.bind(mqttConnection);
            if (!mqttSession2.assignState(MqttSession.SessionStatus.CONNECTING, MqttSession.SessionStatus.CONNECTED)) {
                throw new MqttSessionCorruptedException("old session moved in other state state by other thread");
            }
            if (!this.pool.replace(str, mqttSession2, mqttSession2)) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
            postConnectAction = PostConnectAction.SEND_STORED_MESSAGES;
            LOG.trace("case 3, oldSession with same CId {} disconnected", str);
        } else if (mqttSession2.connected()) {
            LOG.trace("case 4, oldSession with same CId {} still connected, force to close", str);
            mqttSession2.closeImmediately();
            if (!this.pool.replace(str, mqttSession2, mqttSession)) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
        }
        return postConnectAction;
    }

    private void reactivateSubscriptions(MqttSession mqttSession) {
        for (Subscription subscription : mqttSession.getSubscriptions()) {
        }
    }

    private void unsubscribe(MqttSession mqttSession) {
        Iterator<Subscription> it = mqttSession.getSubscriptions().iterator();
        while (it.hasNext()) {
            this.subscriptionsDirectory.removeSubscription(it.next().getTopicFilter(), mqttSession.getClientID());
        }
    }

    private MqttSession createNewSession(MqttConnection mqttConnection, MqttConnectMessage mqttConnectMessage, String str) {
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        Queue<EnqueuedMessage> computeIfAbsent = this.queues.computeIfAbsent(str, str2 -> {
            return this.queueRepository.createQueue(str2, isCleanSession);
        });
        MqttSession mqttSession = mqttConnectMessage.variableHeader().isWillFlag() ? new MqttSession(str, isCleanSession, createWill(mqttConnectMessage), computeIfAbsent) : new MqttSession(isCleanSession, str, computeIfAbsent);
        mqttSession.markConnected();
        mqttSession.bind(mqttConnection);
        return mqttSession;
    }

    private void copySessionConfig(MqttConnectMessage mqttConnectMessage, MqttSession mqttSession) {
        mqttSession.update(mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().isWillFlag() ? createWill(mqttConnectMessage) : null);
    }

    private MqttSession.Will createWill(MqttConnectMessage mqttConnectMessage) {
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(mqttConnectMessage.payload().willMessageInBytes());
        return new MqttSession.Will(mqttConnectMessage.payload().willTopic(), copiedBuffer, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttSession retrieve(String str) {
        return this.pool.get(str);
    }

    public void remove(String str) {
        this.pool.remove(str);
    }

    public void disconnect(String str) {
        MqttSession retrieve = retrieve(str);
        if (retrieve == null) {
            LOG.debug("Some other thread already removed the session CId={}", str);
        } else {
            retrieve.disconnect();
        }
    }

    private void dropQueuesForClient(String str) {
        this.queues.remove(str);
    }
}
