/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.messaging.state;

import io.atomix.catalyst.transport.Address;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.session.Session;
import io.atomix.messaging.DistributedMessageBus;
import io.atomix.messaging.state.MessageBusCommands;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class MessageBusState
extends ResourceStateMachine
implements SessionListener {
    private final Map<Long, Commit<MessageBusCommands.Join>> members = new HashMap<Long, Commit<MessageBusCommands.Join>>();
    private final Map<String, Map<Long, Commit<MessageBusCommands.Register>>> topics = new HashMap<String, Map<Long, Commit<MessageBusCommands.Register>>>();

    public MessageBusState() {
        super(new ResourceType(DistributedMessageBus.class));
    }

    @Override
    public void close(ServerSession session) {
        this.members.remove(session.id());
        for (Commit<MessageBusCommands.Join> member : this.members.values()) {
            if (member.session().state() != Session.State.OPEN) continue;
            member.session().publish("leave", session.id());
        }
    }

    public Map<String, Set<Address>> join(Commit<MessageBusCommands.Join> commit) {
        try {
            this.members.put(commit.session().id(), commit);
            HashMap<String, Set<Address>> topics = new HashMap<String, Set<Address>>();
            for (Map.Entry<String, Map<Long, Commit<MessageBusCommands.Register>>> entry : this.topics.entrySet()) {
                HashSet<Address> addresses = new HashSet<Address>();
                for (Map.Entry<Long, Commit<MessageBusCommands.Register>> subEntry : entry.getValue().entrySet()) {
                    Commit<MessageBusCommands.Join> member = this.members.get(subEntry.getKey());
                    if (member == null) continue;
                    addresses.add(member.operation().member());
                }
                topics.put(entry.getKey(), addresses);
            }
            return topics;
        }
        catch (Exception e) {
            commit.close();
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void leave(Commit<MessageBusCommands.Leave> commit) {
        try {
            Commit<MessageBusCommands.Join> previous = this.members.remove(commit.session().id());
            if (previous != null) {
                previous.close();
                Iterator<Map.Entry<String, Map<Long, Commit<MessageBusCommands.Register>>>> iterator = this.topics.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, Map<Long, Commit<MessageBusCommands.Register>>> entry = iterator.next();
                    String topic = entry.getKey();
                    Map<Long, Commit<MessageBusCommands.Register>> registrations = entry.getValue();
                    Commit<MessageBusCommands.Register> registration = registrations.remove(commit.session().id());
                    if (registration == null) continue;
                    for (Commit<MessageBusCommands.Join> member : this.members.values()) {
                        member.session().publish("unregister", new MessageBusCommands.ConsumerInfo(topic, previous.operation().member()));
                    }
                    if (!registrations.isEmpty()) continue;
                    iterator.remove();
                }
            }
        }
        finally {
            commit.close();
        }
    }

    public void registerConsumer(Commit<MessageBusCommands.Register> commit) {
        try {
            Commit<MessageBusCommands.Join> parent = this.members.get(commit.session().id());
            if (parent == null) {
                throw new IllegalStateException("unknown session: " + commit.session().id());
            }
            Map registrations = this.topics.computeIfAbsent(commit.operation().topic(), t -> new HashMap());
            registrations.put(commit.session().id(), commit);
            for (Commit<MessageBusCommands.Join> member : this.members.values()) {
                member.session().publish("register", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member()));
            }
        }
        catch (Exception e) {
            commit.close();
            throw e;
        }
    }

    public void unregisterConsumer(Commit<MessageBusCommands.Unregister> commit) {
        try {
            Commit<MessageBusCommands.Register> registration;
            Map<Long, Commit<MessageBusCommands.Register>> registrations = this.topics.get(commit.operation().topic());
            if (registrations != null && (registration = registrations.remove(commit.session().id())) != null) {
                registration.close();
                Commit<MessageBusCommands.Join> parent = this.members.get(registration.session().id());
                if (parent != null) {
                    for (Commit<MessageBusCommands.Join> member : this.members.values()) {
                        member.session().publish("unregister", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member()));
                    }
                }
            }
        }
        catch (Exception e) {
            commit.close();
            throw e;
        }
    }

    @Override
    public void delete() {
        this.members.values().forEach(Commit::close);
        this.topics.values().forEach(m -> m.values().forEach(Commit::close));
        this.topics.clear();
    }
}

