/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.primitives.resources.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AtomixWorkQueueState
extends ResourceStateMachine
implements SessionListener,
Snapshottable {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final AtomicLong totalCompleted = new AtomicLong(0L);
    private final Queue<TaskHolder> unassignedTasks = Queues.newArrayDeque();
    private final Map<String, TaskAssignment> assignments = Maps.newHashMap();
    private final Map<Long, Commit<? extends AtomixWorkQueueCommands.Register>> registeredWorkers = Maps.newHashMap();
    private final AtomicLongMap<Long> activeTasksPerSession = AtomicLongMap.create();

    protected AtomixWorkQueueState(Properties config) {
        super(config);
    }

    protected void configure(StateMachineExecutor executor) {
        executor.register(AtomixWorkQueueCommands.Stats.class, this::stats);
        executor.register(AtomixWorkQueueCommands.Register.class, this::register);
        executor.register(AtomixWorkQueueCommands.Unregister.class, this::unregister);
        executor.register(AtomixWorkQueueCommands.Add.class, this::add);
        executor.register(AtomixWorkQueueCommands.Take.class, this::take);
        executor.register(AtomixWorkQueueCommands.Complete.class, this::complete);
        executor.register(AtomixWorkQueueCommands.Clear.class, this::clear);
    }

    protected WorkQueueStats stats(Commit<? extends AtomixWorkQueueCommands.Stats> commit) {
        try {
            WorkQueueStats workQueueStats = WorkQueueStats.builder().withTotalCompleted(this.totalCompleted.get()).withTotalPending((long)this.unassignedTasks.size()).withTotalInProgress((long)this.assignments.size()).build();
            return workQueueStats;
        }
        finally {
            commit.close();
        }
    }

    protected void clear(Commit<? extends AtomixWorkQueueCommands.Clear> commit) {
        try {
            this.unassignedTasks.forEach(TaskHolder::complete);
            this.unassignedTasks.clear();
            this.assignments.values().forEach(TaskAssignment::markComplete);
            this.assignments.clear();
            this.registeredWorkers.values().forEach(Commit::close);
            this.registeredWorkers.clear();
            this.activeTasksPerSession.clear();
            this.totalCompleted.set(0L);
        }
        finally {
            commit.close();
        }
    }

    protected void register(Commit<? extends AtomixWorkQueueCommands.Register> commit) {
        long sessionId = commit.session().id();
        if (this.registeredWorkers.putIfAbsent(sessionId, commit) != null) {
            commit.close();
        }
    }

    protected void unregister(Commit<? extends AtomixWorkQueueCommands.Unregister> commit) {
        try {
            Commit<? extends AtomixWorkQueueCommands.Register> registerCommit = this.registeredWorkers.remove(commit.session().id());
            if (registerCommit != null) {
                registerCommit.close();
            }
        }
        finally {
            commit.close();
        }
    }

    protected void add(Commit<? extends AtomixWorkQueueCommands.Add> commit) {
        Collection<byte[]> items = ((AtomixWorkQueueCommands.Add)commit.operation()).items();
        CountDownCompleter referenceTracker = new CountDownCompleter(commit, (long)items.size(), Commit::close);
        AtomicInteger itemIndex = new AtomicInteger(0);
        items.forEach(item -> {
            String taskId = String.format("%d:%d:%d", commit.session().id(), commit.index(), itemIndex.getAndIncrement());
            this.unassignedTasks.add(new TaskHolder((Task<byte[]>)new Task(taskId, item), (CountDownCompleter<Commit<? extends AtomixWorkQueueCommands.Add>>)referenceTracker));
        });
        this.registeredWorkers.values().stream().map(Commit::session).forEach(session -> session.publish("task-available"));
    }

    protected Collection<Task<byte[]>> take(Commit<? extends AtomixWorkQueueCommands.Take> commit) {
        try {
            if (this.unassignedTasks.isEmpty()) {
                ImmutableList immutableList = ImmutableList.of();
                return immutableList;
            }
            long sessionId = commit.session().id();
            int maxTasks = ((AtomixWorkQueueCommands.Take)commit.operation()).maxTasks();
            Collection collection = IntStream.range(0, Math.min(maxTasks, this.unassignedTasks.size())).mapToObj(i -> {
                TaskHolder holder = this.unassignedTasks.poll();
                String taskId = holder.task().taskId();
                TaskAssignment assignment = new TaskAssignment(sessionId, holder);
                this.assignments.put(taskId, assignment);
                this.activeTasksPerSession.incrementAndGet((Object)sessionId);
                return holder.task();
            }).collect(Collectors.toCollection(ArrayList::new));
            return collection;
        }
        catch (Exception e) {
            this.log.warn("State machine update failed", (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
        finally {
            commit.close();
        }
    }

    protected void complete(Commit<? extends AtomixWorkQueueCommands.Complete> commit) {
        long sessionId = commit.session().id();
        try {
            ((AtomixWorkQueueCommands.Complete)commit.operation()).taskIds().forEach(taskId -> {
                TaskAssignment assignment = this.assignments.get(taskId);
                if (assignment != null && assignment.sessionId() == sessionId) {
                    this.assignments.remove(taskId).markComplete();
                    this.totalCompleted.incrementAndGet();
                    this.activeTasksPerSession.decrementAndGet((Object)sessionId);
                }
            });
        }
        catch (Exception e) {
            this.log.warn("State machine update failed", (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
        finally {
            commit.close();
        }
    }

    public void register(ServerSession session) {
    }

    public void unregister(ServerSession session) {
        this.evictWorker(session.id());
    }

    public void expire(ServerSession session) {
        this.evictWorker(session.id());
    }

    public void close(ServerSession session) {
        this.evictWorker(session.id());
    }

    public void snapshot(SnapshotWriter writer) {
        writer.writeLong(this.totalCompleted.get());
    }

    public void install(SnapshotReader reader) {
        this.totalCompleted.set(reader.readLong());
    }

    private void evictWorker(long sessionId) {
        Commit<? extends AtomixWorkQueueCommands.Register> commit = this.registeredWorkers.remove(sessionId);
        if (commit != null) {
            commit.close();
        }
        Iterator<Map.Entry<String, TaskAssignment>> iter = this.assignments.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, TaskAssignment> entry = iter.next();
            TaskAssignment assignment = entry.getValue();
            if (assignment.sessionId() != sessionId) continue;
            this.unassignedTasks.add(assignment.taskHolder());
            iter.remove();
        }
        this.activeTasksPerSession.remove((Object)sessionId);
        this.activeTasksPerSession.removeAllZeros();
    }

    private class TaskAssignment {
        private final long sessionId;
        private final TaskHolder taskHolder;

        public TaskAssignment(long sessionId, TaskHolder taskHolder) {
            this.sessionId = sessionId;
            this.taskHolder = taskHolder;
        }

        public long sessionId() {
            return this.sessionId;
        }

        public TaskHolder taskHolder() {
            return this.taskHolder;
        }

        public void markComplete() {
            this.taskHolder.complete();
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("sessionId", this.sessionId).add("taskHolder", (Object)this.taskHolder).toString();
        }
    }

    private class TaskHolder {
        private final Task<byte[]> task;
        private final CountDownCompleter<Commit<? extends AtomixWorkQueueCommands.Add>> referenceTracker;

        public TaskHolder(Task<byte[]> delegate, CountDownCompleter<Commit<? extends AtomixWorkQueueCommands.Add>> referenceTracker) {
            this.task = delegate;
            this.referenceTracker = referenceTracker;
        }

        public Task<byte[]> task() {
            return this.task;
        }

        public void complete() {
            this.referenceTracker.countDown();
        }
    }
}

