/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.messaging;

import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.messaging.state.TopicCommands;
import io.atomix.messaging.state.TopicState;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import io.atomix.resource.WriteConsistency;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

@ResourceTypeInfo(id=-31, stateMachine=TopicState.class, typeResolver=TopicCommands.TypeResolver.class)
public class DistributedTopic<T>
extends Resource<DistributedTopic<T>> {
    private final Set<Consumer<T>> listeners = new HashSet<Consumer<T>>();

    public static Resource.Options options() {
        return new Resource.Options();
    }

    public static Resource.Config config() {
        return new Resource.Config();
    }

    public DistributedTopic(CopycatClient client, Resource.Options options) {
        super(client, options);
    }

    @Override
    public CompletableFuture<DistributedTopic<T>> open() {
        return super.open().thenApply(result -> {
            this.client.onEvent("message", event -> {
                for (Consumer<Object> consumer : this.listeners) {
                    consumer.accept(event);
                }
            });
            return result;
        });
    }

    public DistributedTopic<T> sync() {
        return (DistributedTopic)this.with(WriteConsistency.ATOMIC);
    }

    public DistributedTopic<T> async() {
        return (DistributedTopic)this.with(WriteConsistency.SEQUENTIAL_EVENT);
    }

    public CompletableFuture<Void> publish(T message) {
        return this.submit(new TopicCommands.Publish<T>(message));
    }

    public CompletableFuture<Listener<T>> subscribe(Consumer<T> listener) {
        if (!this.listeners.isEmpty()) {
            this.listeners.add(listener);
            return CompletableFuture.completedFuture(new TopicListener(listener));
        }
        this.listeners.add(listener);
        return this.submit(new TopicCommands.Listen()).thenApply(v -> new TopicListener(listener));
    }

    private class TopicListener
    implements Listener<T> {
        private final Consumer<T> listener;

        private TopicListener(Consumer<T> listener) {
            this.listener = listener;
        }

        @Override
        public void accept(T message) {
            this.listener.accept(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            DistributedTopic distributedTopic = DistributedTopic.this;
            synchronized (distributedTopic) {
                DistributedTopic.this.listeners.remove(this.listener);
                if (DistributedTopic.this.listeners.isEmpty()) {
                    DistributedTopic.this.submit(new TopicCommands.Unlisten());
                }
            }
        }
    }
}

