/*
 * Decompiled with CFR 0.152.
 */
package net.kuujo.copycat.state.internal;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;
import net.kuujo.copycat.state.StateLog;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.state.internal.SnapshottableLogManager;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.util.serializer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultStateLog<T>
extends AbstractResource<StateLog<T>>
implements StateLog<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStateLog.class);
    private static final int SNAPSHOT_ENTRY = 0;
    private static final int COMMAND_ENTRY = 1;
    private static final int SNAPSHOT_CHUNK_SIZE = 0x100000;
    private static final int SNAPSHOT_INFO = 0;
    private static final int SNAPSHOT_CHUNK = 1;
    private final Map<Integer, OperationInfo> operations = new ConcurrentHashMap<Integer, OperationInfo>(128);
    private final Set<TriConsumer<String, Object, Object>> watchers = Sets.newCopyOnWriteArraySet();
    private final Consistency defaultConsistency;
    private final SnapshottableLogManager log;
    private Supplier snapshotter;
    private Consumer installer;
    private SnapshotInfo snapshotInfo;
    private List<ByteBuffer> snapshotChunks;

    public DefaultStateLog(ResourceManager context) {
        super(context);
        this.log = (SnapshottableLogManager)context.log();
        this.defaultConsistency = ((StateLogConfig)context.config().getResourceConfig()).getDefaultConsistency();
        context.consumer(this::consume);
    }

    @Override
    public <U extends T, V> StateLog<T> registerCommand(String name, Function<U, V> command) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot register command on open state log", (Object[])new Object[0]);
        this.operations.put(name.hashCode(), new OperationInfo(name, command, false));
        LOGGER.debug("{} - Registered state log command {}", (Object)this.context.name(), (Object)name);
        return this;
    }

    @Override
    public StateLog<T> unregisterCommand(String name) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot unregister command on open state log", (Object[])new Object[0]);
        OperationInfo info = this.operations.remove(name.hashCode());
        if (info != null) {
            LOGGER.debug("{} - Unregistered state log command {}", (Object)this.context.name(), (Object)name);
        }
        return this;
    }

    @Override
    public StateLog<T> registerWatcher(TriConsumer<String, Object, Object> watcher) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot register watcher on open state log", (Object[])new Object[0]);
        this.watchers.add(watcher);
        return this;
    }

    @Override
    public StateLog<T> unregisterWatcher(TriConsumer<String, Object, Object> watcher) {
        this.watchers.remove(watcher);
        return this;
    }

    @Override
    public <U extends T, V> StateLog<T> registerQuery(String name, Function<U, V> query) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot register command on open state log", (Object[])new Object[0]);
        Assert.isNotNull((Object)name, (String)"name");
        Assert.isNotNull(query, (String)"query");
        this.operations.put(name.hashCode(), new OperationInfo(name, query, true, this.defaultConsistency));
        LOGGER.debug("{} - Registered state log query {} with default consistency", (Object)this.context.name(), (Object)name);
        return this;
    }

    @Override
    public <U extends T, V> StateLog<T> registerQuery(String name, Function<U, V> query, Consistency consistency) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot register command on open state log", (Object[])new Object[0]);
        Assert.isNotNull((Object)name, (String)"name");
        Assert.isNotNull(query, (String)"query");
        this.operations.put(name.hashCode(), new OperationInfo(name, query, true, consistency == null || consistency == Consistency.DEFAULT ? this.defaultConsistency : consistency));
        LOGGER.debug("{} - Registered state log query {} with consistency {}", new Object[]{this.context.name(), name, consistency});
        return this;
    }

    @Override
    public StateLog<T> unregisterQuery(String name) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot unregister command on open state log", (Object[])new Object[0]);
        OperationInfo info = this.operations.remove(name.hashCode());
        if (info != null) {
            LOGGER.debug("{} - Unregistered state log query {}", (Object)this.context.name(), (Object)name);
        }
        return this;
    }

    @Override
    public StateLog<T> unregister(String name) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot unregister command on open state log", (Object[])new Object[0]);
        OperationInfo info = this.operations.remove(name.hashCode());
        if (info != null) {
            LOGGER.debug("{} - Unregistered state log operation {}", (Object)this.context.name(), (Object)name);
        }
        return this;
    }

    @Override
    public <V> StateLog<T> snapshotWith(Supplier<V> snapshotter) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot modify state log once opened", (Object[])new Object[0]);
        this.snapshotter = snapshotter;
        LOGGER.debug("{} - Registered state log snapshot handler", (Object)this.context.name());
        return this;
    }

    @Override
    public <V> StateLog<T> installWith(Consumer<V> installer) {
        Assert.state((boolean)this.isClosed(), (String)"Cannot modify state log once opened", (Object[])new Object[0]);
        this.installer = installer;
        LOGGER.debug("{} - Registered state log install handler", (Object)this.context.name());
        return this;
    }

    @Override
    public <U> CompletableFuture<U> submit(String command, T entry) {
        Assert.state((boolean)this.isOpen(), (String)"State log not open", (Object[])new Object[0]);
        OperationInfo operationInfo = this.operations.get(command.hashCode());
        if (operationInfo == null) {
            return Futures.exceptionalFutureAsync((Throwable)new CopycatException(String.format("Invalid state log command %s", command), new Object[0]), (Executor)this.executor);
        }
        ByteBuffer buffer = this.serializer.writeObject(entry);
        ByteBuffer commandEntry = ByteBuffer.allocate(8 + buffer.capacity());
        commandEntry.putInt(1);
        commandEntry.putInt(command.hashCode());
        commandEntry.put(buffer);
        commandEntry.rewind();
        if (operationInfo.readOnly) {
            LOGGER.debug("{} - Submitting state log query {} with entry {}", new Object[]{this.context.name(), command, entry});
            return this.context.query(commandEntry, operationInfo.consistency).thenApplyAsync(arg_0 -> ((Serializer)this.serializer).readObject(arg_0), this.executor);
        }
        LOGGER.debug("{} - Submitting state log command {} with entry {}", new Object[]{this.context.name(), command, entry});
        return this.context.commit(commandEntry).thenApplyAsync(arg_0 -> ((Serializer)this.serializer).readObject(arg_0), this.executor);
    }

    public synchronized CompletableFuture<StateLog<T>> open() {
        return ((CompletableFuture)this.runStartupTasks().thenComposeAsync(v -> this.context.open(), this.executor)).thenApply(v -> this);
    }

    public synchronized CompletableFuture<Void> close() {
        return this.context.close().thenComposeAsync(v -> this.runShutdownTasks(), this.executor);
    }

    private ByteBuffer consume(long term, Long index, ByteBuffer entry) {
        int entryType = entry.getInt();
        switch (entryType) {
            case 0: {
                if (this.installSnapshot(entry.slice())) {
                    LOGGER.info("Installed snapshot upto index {}", (Object)index);
                    try {
                        this.log.split(index);
                    }
                    catch (IOException e) {
                        LOGGER.warn("Split failed", (Throwable)e);
                    }
                }
                return ByteBuffer.allocate(0);
            }
            case 1: {
                int commandCode = entry.getInt();
                OperationInfo operationInfo = this.operations.get(commandCode);
                if (operationInfo != null) {
                    Object input = this.serializer.readObject(entry.slice());
                    Object output = operationInfo.execute(term, index, input);
                    if (!operationInfo.readOnly) {
                        this.watchers.forEach(w -> w.accept((Object)operationInfo.name, input, output));
                    }
                    return this.serializer.writeObject(output);
                }
                throw new IllegalStateException("Invalid state log operation");
            }
        }
        throw new IllegalArgumentException("Invalid entry type");
    }

    private void checkSnapshot(long term, long index) {
        if (this.log.isSnapshottable(index)) {
            this.takeSnapshot(term, index);
        }
    }

    private void takeSnapshot(long term, long index) {
        byte[] bytes;
        String id = UUID.randomUUID().toString();
        LOGGER.info("{} - Taking snapshot {} upto index {}", new Object[]{this.context.name(), id, index});
        Object snapshot = this.snapshotter != null ? this.snapshotter.get() : null;
        ByteBuffer snapshotBuffer = snapshot != null ? this.serializer.writeObject(snapshot) : ByteBuffer.allocate(0);
        LOGGER.debug("{} - Calculating snapshot chunk size for snapshot {}", (Object)this.context.name(), (Object)id);
        byte[] snapshotId = id.getBytes();
        int numChunks = (int)Math.ceil((double)snapshotBuffer.limit() / 1048576.0);
        LOGGER.debug("{} - Creating {} chunks for snapshot {}", new Object[]{this.context.name(), numChunks, id});
        ArrayList<ByteBuffer> chunks = new ArrayList<ByteBuffer>(numChunks + 1);
        ByteBuffer info = ByteBuffer.allocate(28 + snapshotId.length);
        info.putLong(term);
        info.putInt(0);
        info.putInt(0);
        info.putInt(snapshotId.length);
        info.put(snapshotId);
        info.putInt(snapshotBuffer.limit());
        info.putInt(numChunks);
        chunks.add(info);
        int i = 0;
        for (int position = 0; position < snapshotBuffer.limit(); position += bytes.length) {
            bytes = new byte[Math.min(snapshotBuffer.limit() - position, 0x100000)];
            snapshotBuffer.get(bytes);
            ByteBuffer chunk = ByteBuffer.allocate(24 + bytes.length);
            chunk.putLong(term);
            chunk.putInt(0);
            chunk.putInt(1);
            chunk.putInt(i++);
            chunk.putInt(bytes.length);
            chunk.put(bytes);
            chunk.flip();
            chunks.add(chunk);
        }
        try {
            LOGGER.debug("{} - Appending {} chunks for snapshot {} at index {}", new Object[]{this.context.name(), chunks.size(), id, index});
            this.log.appendSnapshot(index, chunks);
        }
        catch (IOException e) {
            throw new CopycatException("Failed to compact state log", new Object[]{e});
        }
    }

    private boolean installSnapshot(ByteBuffer snapshotChunk) {
        boolean installedSnapshot = false;
        int type = snapshotChunk.getInt();
        if (type == 0) {
            int idLength = snapshotChunk.getInt();
            byte[] idBytes = new byte[idLength];
            snapshotChunk.get(idBytes);
            String id = new String(idBytes);
            int size = snapshotChunk.getInt();
            int numChunks = snapshotChunk.getInt();
            if (this.snapshotInfo == null || !this.snapshotInfo.id.equals(id)) {
                LOGGER.debug("{} - Processing snapshot metadata for snapshot {}", (Object)this.context.name(), (Object)id);
                this.snapshotInfo = new SnapshotInfo(id, size, numChunks);
                this.snapshotChunks = new ArrayList<ByteBuffer>(numChunks);
            }
        } else if (type == 1 && this.snapshotInfo != null) {
            int index = snapshotChunk.getInt();
            int chunkLength = snapshotChunk.getInt();
            byte[] chunkBytes = new byte[chunkLength];
            snapshotChunk.get(chunkBytes);
            if (this.snapshotChunks.size() == index) {
                LOGGER.debug("{} - Processing snapshot chunk {} for snapshot {}", new Object[]{this.context.name(), index, this.snapshotInfo.id});
                this.snapshotChunks.add(ByteBuffer.wrap(chunkBytes));
                if (this.snapshotChunks.size() == this.snapshotInfo.chunks) {
                    LOGGER.debug("{} - Completed assembly of snapshot {} from log", (Object)this.context.name(), (Object)this.snapshotInfo.id);
                    if (this.installer != null) {
                        int size = 0;
                        for (ByteBuffer chunk : this.snapshotChunks) {
                            size += chunk.limit();
                        }
                        Assert.state((size == this.snapshotInfo.size ? 1 : 0) != 0, (String)"Received inconsistent snapshot", (Object[])new Object[0]);
                        LOGGER.debug("{} - Assembled snapshot size: {} bytes", (Object)this.context.name(), (Object)size);
                        if (size > 0) {
                            ByteBuffer completeSnapshot = ByteBuffer.allocate(size);
                            this.snapshotChunks.forEach(completeSnapshot::put);
                            completeSnapshot.flip();
                            LOGGER.info("{} - Installing snapshot {}", (Object)this.context.name(), (Object)this.snapshotInfo.id);
                            try {
                                this.installer.accept(this.serializer.readObject(completeSnapshot));
                                installedSnapshot = true;
                            }
                            catch (Exception e) {
                                LOGGER.warn("{} - Failed to install snapshot: {}", (Object)this.context.name(), (Object)e.getMessage());
                            }
                        }
                    }
                    this.snapshotInfo = null;
                    this.snapshotChunks = null;
                }
            }
        }
        return installedSnapshot;
    }

    public String toString() {
        return String.format("%s[name=%s]", this.getClass().getSimpleName(), this.context.name());
    }

    private class OperationInfo<TT, U> {
        private final String name;
        private final Function<TT, U> function;
        private final boolean readOnly;
        private final Consistency consistency;

        private OperationInfo(String name, Function<TT, U> function, boolean readOnly) {
            this(name, function, readOnly, Consistency.DEFAULT);
        }

        private OperationInfo(String name, Function<TT, U> function, boolean readOnly, Consistency consistency) {
            this.name = name;
            this.function = function;
            this.readOnly = readOnly;
            this.consistency = consistency;
        }

        private U execute(long term, Long index, TT entry) {
            if (index != null) {
                DefaultStateLog.this.checkSnapshot(term, index);
            }
            return this.function.apply(entry);
        }
    }

    private static class SnapshotChunk {
        private final int index;
        private final ByteBuffer chunk;

        private SnapshotChunk(int index, ByteBuffer chunk) {
            this.index = index;
            this.chunk = chunk;
        }
    }

    private static class SnapshotInfo {
        private final String id;
        private final int size;
        private final int chunks;

        private SnapshotInfo(String id, int size, int chunks) {
            this.id = id;
            this.size = size;
            this.chunks = chunks;
        }
    }
}

