/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.messaging;

import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.util.ConfigurationException;
import io.atomix.catalyst.util.concurrent.Futures;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.messaging.Message;
import io.atomix.messaging.MessageConsumer;
import io.atomix.messaging.MessageProducer;
import io.atomix.messaging.state.MessageBusCommands;
import io.atomix.messaging.state.MessageBusState;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

@ResourceTypeInfo(id=-30, stateMachine=MessageBusState.class, typeResolver=MessageBusCommands.TypeResolver.class)
public class DistributedMessageBus
extends Resource<DistributedMessageBus> {
    private Client client;
    private Server server;
    private final Options options;
    private final Map<Integer, Connection> connections = new HashMap<Integer, Connection>();
    private volatile CompletableFuture<DistributedMessageBus> openFuture;
    private volatile CompletableFuture<Void> closeFuture;
    private final Map<String, RemoteConsumers> remotes = new ConcurrentHashMap<String, RemoteConsumers>();
    private final Map<String, InternalMessageConsumer> consumers = new ConcurrentHashMap<String, InternalMessageConsumer>();
    private volatile boolean open;

    public static Options options() {
        return new Options();
    }

    public static Resource.Config config() {
        return new Resource.Config();
    }

    public DistributedMessageBus(CopycatClient client, Resource.Options options) {
        super(client, options);
        this.options = new Options(options);
    }

    @Override
    public CompletableFuture<DistributedMessageBus> open() {
        Address address = this.options.getAddress();
        return super.open().thenCompose(v -> this.listen(address));
    }

    public synchronized CompletableFuture<DistributedMessageBus> listen(Address address) {
        if (this.openFuture != null) {
            return this.openFuture;
        }
        this.client = ((Resource)this).client.transport().client();
        this.server = ((Resource)this).client.transport().server();
        this.openFuture = new CompletableFuture();
        ((Resource)this).client.context().execute(() -> this.server.listen(address, this::connectListener).whenComplete((result, error) -> {
            DistributedMessageBus distributedMessageBus = this;
            synchronized (distributedMessageBus) {
                if (error == null) {
                    this.open = true;
                    CompletableFuture<DistributedMessageBus> future = this.openFuture;
                    if (future != null) {
                        this.openFuture = null;
                        future.complete(null);
                    }
                } else {
                    this.open = false;
                    CompletableFuture<DistributedMessageBus> future = this.openFuture;
                    if (future != null) {
                        this.openFuture = null;
                        future.completeExceptionally((Throwable)error);
                    }
                }
            }
        }));
        return ((CompletableFuture)this.openFuture.thenCompose(v -> {
            CompletableFuture future = new CompletableFuture();
            this.submit(new MessageBusCommands.Join(address)).whenComplete((topics, error) -> {
                if (error == null) {
                    for (Map.Entry entry : topics.entrySet()) {
                        this.remotes.put((String)entry.getKey(), new RemoteConsumers((Set)entry.getValue()));
                    }
                    ((Resource)this).client.onEvent("register", this::registerConsumer);
                    ((Resource)this).client.onEvent("unregister", this::unregisterConsumer);
                    future.complete(null);
                } else {
                    future.completeExceptionally((Throwable)error);
                }
            });
            return future;
        })).thenApply(v -> this);
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    private void connectListener(Connection connection) {
        connection.handler(Message.class, this::handleMessage);
    }

    private void registerConsumer(MessageBusCommands.ConsumerInfo info) {
        RemoteConsumers consumers = this.remotes.get(info.topic());
        if (consumers == null) {
            consumers = new RemoteConsumers(Collections.singleton(info.address()));
            this.remotes.put(info.topic(), consumers);
        } else {
            consumers.add(info.address());
        }
    }

    private void unregisterConsumer(MessageBusCommands.ConsumerInfo info) {
        RemoteConsumers consumers = this.remotes.get(info.topic());
        if (consumers != null && consumers.remove(info.address())) {
            this.remotes.remove(info.topic());
        }
    }

    private CompletableFuture<Void> handleMessage(Message message) {
        InternalMessageConsumer consumer = this.consumers.get(message.topic());
        if (consumer == null) {
            return Futures.exceptionalFuture(new IllegalStateException("unknown topic " + message.topic()));
        }
        return consumer.consume(message.body());
    }

    public <T> CompletableFuture<MessageProducer<T>> producer(String topic) {
        return CompletableFuture.completedFuture(new InternalMessageProducer(topic));
    }

    public <T> CompletableFuture<MessageConsumer<T>> consumer(String topic) {
        return this.consumer(topic, null);
    }

    public <T> CompletableFuture<MessageConsumer<T>> consumer(String topic, Function<T, ?> consumer) {
        CompletableFuture future = new CompletableFuture();
        this.submit(new MessageBusCommands.Register(topic)).whenComplete((result, error) -> {
            if (error == null) {
                InternalMessageConsumer internalConsumer = new InternalMessageConsumer(topic, consumer);
                this.consumers.put(topic, internalConsumer);
                future.complete(internalConsumer);
            } else {
                future.completeExceptionally((Throwable)error);
            }
        });
        return future;
    }

    private CompletableFuture<Connection> next(String topic) {
        RemoteConsumers consumers = this.remotes.get(topic);
        if (consumers == null) {
            return CompletableFuture.completedFuture(null);
        }
        Address next = consumers.next();
        if (next == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getConnection(next);
    }

    private CompletableFuture<Connection> getConnection(Address address) {
        Connection connection = this.connections.get(address.hashCode());
        return connection == null ? this.createConnection(address) : CompletableFuture.completedFuture(connection);
    }

    private CompletableFuture<Connection> createConnection(Address address) {
        return this.client.connect(address).thenApply(connection -> {
            this.connections.put(address.hashCode(), (Connection)connection);
            connection.closeListener(c -> this.connections.remove(address.hashCode()));
            return connection;
        });
    }

    @Override
    public synchronized CompletableFuture<Void> close() {
        if (this.closeFuture != null) {
            return this.closeFuture;
        }
        if (this.server == null) {
            return Futures.exceptionalFuture(new IllegalStateException("message bus not open"));
        }
        this.closeFuture = new CompletableFuture();
        ((Resource)this).client.context().execute(() -> this.server.close().whenComplete((result, error) -> {
            DistributedMessageBus distributedMessageBus = this;
            synchronized (distributedMessageBus) {
                this.open = false;
                if (error == null) {
                    CompletableFuture<Void> future = this.closeFuture;
                    if (future != null) {
                        this.closeFuture = null;
                        future.complete(null);
                    }
                } else {
                    this.open = false;
                    CompletableFuture<Void> future = this.closeFuture;
                    if (future != null) {
                        this.closeFuture = null;
                        future.completeExceptionally((Throwable)error);
                    }
                }
            }
        }));
        return this.closeFuture;
    }

    @Override
    public boolean isClosed() {
        return !this.open;
    }

    public static class Options
    extends Resource.Options {
        private static final String ADDRESS = "address";

        public Options() {
        }

        public Options(Properties defaults) {
            super(defaults);
        }

        public Address getAddress() {
            String addressString = this.getProperty(ADDRESS);
            if (addressString == null) {
                throw new ConfigurationException("missing required message bus property: address", new Object[0]);
            }
            String[] split = addressString.split(":");
            if (split.length != 2) {
                throw new ConfigurationException("malformed address string: " + addressString, new Object[0]);
            }
            try {
                return new Address(split[0], Integer.valueOf(split[1]));
            }
            catch (NumberFormatException e) {
                throw new ConfigurationException("malformed port: " + split[1], new Object[0]);
            }
        }

        public Options withAddress(Address address) {
            this.setProperty(ADDRESS, String.format("%s:%s", address.host(), address.port()));
            return this;
        }
    }

    private static class RemoteConsumers {
        private final List<Address> consumers;
        private Iterator<Address> iterator;

        private RemoteConsumers(Set<Address> consumers) {
            this.consumers = new ArrayList<Address>(consumers);
        }

        private void add(Address address) {
            this.consumers.add(address);
            this.iterator = this.consumers.iterator();
        }

        private boolean remove(Address address) {
            this.consumers.remove(address);
            return this.consumers.isEmpty();
        }

        private Address next() {
            if (this.consumers.isEmpty()) {
                return null;
            }
            if (this.iterator == null || !this.iterator.hasNext()) {
                this.iterator = this.consumers.iterator();
            }
            return this.iterator.next();
        }
    }

    private class InternalMessageProducer<T>
    implements MessageProducer<T> {
        private final String topic;

        private InternalMessageProducer(String topic) {
            this.topic = topic;
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public <U> CompletableFuture<U> send(T message) {
            return DistributedMessageBus.this.next(this.topic).thenCompose(c -> {
                if (c == null) {
                    return Futures.exceptionalFuture(new IllegalStateException("no handlers"));
                }
                return c.send(new Message(this.topic, message));
            });
        }

        @Override
        public CompletableFuture<Void> close() {
            return CompletableFuture.completedFuture(null);
        }
    }

    private class InternalMessageConsumer<T>
    implements MessageConsumer<T> {
        private final String topic;
        private Function<T, ?> consumer;

        private InternalMessageConsumer(String topic, Function<T, ?> consumer) {
            this.topic = topic;
            this.consumer = consumer;
        }

        @Override
        public String topic() {
            return this.topic;
        }

        @Override
        public MessageConsumer<T> onMessage(Function<T, ?> consumer) {
            this.consumer = consumer;
            return this;
        }

        private CompletableFuture<Object> consume(Object message) {
            Object result = this.consumer.apply(message);
            if (result instanceof CompletableFuture) {
                return (CompletableFuture)result;
            }
            return CompletableFuture.completedFuture(result);
        }

        @Override
        public CompletableFuture<Void> close() {
            return DistributedMessageBus.this.submit(new MessageBusCommands.Unregister(this.topic));
        }
    }
}

