/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.messaging;

import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.messaging.state.TaskQueueCommands;
import io.atomix.messaging.state.TaskQueueState;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import io.atomix.resource.WriteConsistency;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

@ResourceTypeInfo(id=-32, stateMachine=TaskQueueState.class, typeResolver=TaskQueueCommands.TypeResolver.class)
public class DistributedTaskQueue<T>
extends Resource<DistributedTaskQueue<T>> {
    private long taskId;
    private final Map<Long, CompletableFuture<Void>> taskFutures = new ConcurrentHashMap<Long, CompletableFuture<Void>>();
    private Consumer<T> consumer;

    public static Resource.Options options() {
        return new Resource.Options();
    }

    public static Resource.Config config() {
        return new Resource.Config();
    }

    public DistributedTaskQueue(CopycatClient client, Resource.Options options) {
        super(client, options);
    }

    @Override
    public CompletableFuture<DistributedTaskQueue<T>> open() {
        return super.open().thenApply(result -> {
            this.client.onEvent("process", this::process);
            this.client.onEvent("ack", this::ack);
            return result;
        });
    }

    private void process(T task) {
        if (this.consumer != null) {
            this.consumer.accept(task);
            this.submit((T)new TaskQueueCommands.Ack()).whenComplete((result, error) -> {
                if (error == null && result != null) {
                    this.process(result);
                }
            });
        }
    }

    private void ack(long taskId) {
        CompletableFuture<Void> future = this.taskFutures.remove(taskId);
        if (future != null) {
            future.complete(null);
        }
    }

    public DistributedTaskQueue<T> sync() {
        return (DistributedTaskQueue)this.with(WriteConsistency.ATOMIC);
    }

    public DistributedTaskQueue<T> async() {
        return (DistributedTaskQueue)this.with(WriteConsistency.SEQUENTIAL_EVENT);
    }

    public CompletableFuture<Void> submit(T task) {
        if (this.writeConsistency() == WriteConsistency.ATOMIC) {
            return this.submitAtomic(task);
        }
        return this.submitSequential(task);
    }

    private CompletableFuture<Void> submitAtomic(T task) {
        long taskId = ++this.taskId;
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.taskFutures.put(taskId, future);
        this.submit((T)new TaskQueueCommands.Submit(taskId, task, true)).whenComplete((result, error) -> {
            if (error != null) {
                this.taskFutures.remove(taskId);
                future.completeExceptionally((Throwable)error);
            }
        });
        return future;
    }

    private CompletableFuture<Void> submitSequential(T task) {
        return this.submit((T)new TaskQueueCommands.Submit(++this.taskId, task, false));
    }

    public synchronized CompletableFuture<Listener<T>> consumer(Consumer<T> consumer) {
        if (this.consumer != null) {
            this.consumer = consumer;
            return CompletableFuture.completedFuture(new TaskQueueListener(consumer));
        }
        this.consumer = consumer;
        return this.submit((T)new TaskQueueCommands.Subscribe()).thenApply(v -> new TaskQueueListener(consumer));
    }

    private class TaskQueueListener
    implements Listener<T> {
        private final Consumer<T> listener;

        private TaskQueueListener(Consumer<T> listener) {
            this.listener = listener;
        }

        @Override
        public void accept(T message) {
            this.listener.accept(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            DistributedTaskQueue distributedTaskQueue = DistributedTaskQueue.this;
            synchronized (distributedTaskQueue) {
                DistributedTaskQueue.this.consumer = null;
                DistributedTaskQueue.this.submit((Object)new TaskQueueCommands.Unsubscribe());
            }
        }
    }
}

