/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.messaging.state;

import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.session.Session;
import io.atomix.messaging.DistributedTaskQueue;
import io.atomix.messaging.state.TaskQueueCommands;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;

public class TaskQueueState
extends ResourceStateMachine
implements SessionListener {
    private final Map<Long, Commit<TaskQueueCommands.Subscribe>> workers = new HashMap<Long, Commit<TaskQueueCommands.Subscribe>>();
    private final Queue<ServerSession> workerQueue = new ArrayDeque<ServerSession>();
    private final LinkedBlockingDeque<Commit<TaskQueueCommands.Submit>> taskQueue = new LinkedBlockingDeque();
    private final Map<Long, Commit<TaskQueueCommands.Submit>> processing = new HashMap<Long, Commit<TaskQueueCommands.Submit>>();

    public TaskQueueState() {
        super(new ResourceType(DistributedTaskQueue.class));
    }

    @Override
    public void close(ServerSession session) {
        Commit<TaskQueueCommands.Subscribe> commit = this.workers.remove(session.id());
        if (commit != null) {
            commit.close();
        }
        this.workerQueue.remove(session);
        Commit<TaskQueueCommands.Submit> task = this.processing.remove(session.id());
        if (task != null) {
            ServerSession next = this.workerQueue.poll();
            if (next != null) {
                next.publish("process", task.operation().task());
            } else {
                this.taskQueue.addFirst(task);
            }
        }
    }

    public void subscribe(Commit<TaskQueueCommands.Subscribe> commit) {
        this.workers.put(commit.session().id(), commit);
        Commit<TaskQueueCommands.Submit> task = this.taskQueue.poll();
        if (task != null) {
            this.processing.put(commit.session().id(), task);
            commit.session().publish("process", task.operation().task());
        } else {
            this.workerQueue.add(commit.session());
        }
    }

    public void unsubscribe(Commit<TaskQueueCommands.Unsubscribe> commit) {
        this.close(commit.session());
    }

    public void submit(Commit<TaskQueueCommands.Submit> commit) {
        try {
            ServerSession session = this.workerQueue.poll();
            if (session != null) {
                session.publish("process", commit.operation().task());
                this.processing.put(session.id(), commit);
            } else {
                this.taskQueue.add(commit);
            }
        }
        catch (Exception e) {
            commit.close();
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object ack(Commit<TaskQueueCommands.Ack> commit) {
        try {
            Commit<TaskQueueCommands.Submit> acked = this.processing.remove(commit.session().id());
            if (acked == null) {
                throw new IllegalStateException("unknown task");
            }
            if (acked.operation().ack() && acked.session().state() == Session.State.OPEN) {
                acked.session().publish("ack", acked.operation().id());
            }
            acked.close();
            Commit<TaskQueueCommands.Submit> next = this.taskQueue.poll();
            if (next != null) {
                this.processing.put(commit.session().id(), next);
                Object object = next.operation().task();
                return object;
            }
            this.workerQueue.add(commit.session());
            Object var4_5 = null;
            return var4_5;
        }
        finally {
            commit.close();
        }
    }
}

