/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayDataOutputView;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
    private final TypeSerializer<UK> userKeySerializer;
    private final TypeSerializer<UV> userValueSerializer;

    private RocksDBMapState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<Map<UK, UV>> valueSerializer, Map<UK, UV> defaultValue, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        Preconditions.checkState((boolean)(valueSerializer instanceof MapSerializer), (Object)"Unexpected serializer type.");
        MapSerializer castedMapSerializer = (MapSerializer)valueSerializer;
        this.userKeySerializer = castedMapSerializer.getKeySerializer();
        this.userValueSerializer = castedMapSerializer.getValueSerializer();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return this.valueSerializer;
    }

    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes == null ? null : (UV)this.deserializeUserValue(rawValueBytes);
    }

    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.serializeUserValue(userValue);
        this.backend.db.put(this.columnFamily, this.writeOptions, rawKeyBytes, rawValueBytes);
    }

    public void putAll(Map<UK, UV> map23) throws IOException, RocksDBException {
        if (map23 == null) {
            return;
        }
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.backend.db, this.writeOptions);){
            for (Map.Entry<UK, UV> entry : map23.entrySet()) {
                byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
                byte[] rawValueBytes = this.serializeUserValue(entry.getValue());
                writeBatchWrapper.put(this.columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    public void remove(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        this.backend.db.delete(this.columnFamily, this.writeOptions, rawKeyBytes);
    }

    public boolean contains(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes != null;
    }

    public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
        Iterator iterator2 = this.iterator();
        if (!iterator2.hasNext()) {
            return null;
        }
        return () -> iterator2;
    }

    public Iterable<UK> keys() throws IOException {
        byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return () -> new RocksDBMapIterator<UK>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer){

            @Override
            public UK next() {
                RocksDBMapEntry entry = this.nextEntry();
                return entry == null ? null : (Object)entry.getKey();
            }
        };
    }

    public Iterable<UV> values() throws IOException {
        byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return () -> new RocksDBMapIterator<UV>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer){

            @Override
            public UV next() {
                RocksDBMapEntry entry = this.nextEntry();
                return entry == null ? null : (Object)entry.getValue();
            }
        };
    }

    public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
        byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
    }

    @Override
    public void clear() {
        try (RocksIteratorWrapper iterator2 = RocksDBKeyedStateBackend.getRocksIterator(this.backend.db, this.columnFamily);
             WriteBatch writeBatch = new WriteBatch(128);){
            byte[] keyBytes;
            byte[] keyPrefixBytes = this.serializeCurrentKeyAndNamespace();
            iterator2.seek(keyPrefixBytes);
            while (iterator2.isValid() && this.startWithKeyPrefix(keyPrefixBytes, keyBytes = iterator2.key())) {
                writeBatch.remove(this.columnFamily, keyBytes);
                iterator2.next();
            }
            this.backend.db.write(this.writeOptions, writeBatch);
        }
        catch (Exception e) {
            LOG.warn("Error while cleaning the state.", e);
        }
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
        Preconditions.checkNotNull((Object)serializedKeyAndNamespace);
        Preconditions.checkNotNull(safeKeySerializer);
        Preconditions.checkNotNull(safeNamespaceSerializer);
        Preconditions.checkNotNull(safeValueSerializer);
        Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keyAndNamespace.f0, (int)this.backend.getNumberOfKeyGroups());
        ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
        DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper((OutputStream)outputStream);
        this.writeKeyWithGroupAndNamespace(keyGroup, keyAndNamespace.f0, safeKeySerializer, keyAndNamespace.f1, safeNamespaceSerializer, outputStream, (DataOutputView)outputView);
        byte[] keyPrefixBytes = outputStream.toByteArray();
        MapSerializer serializer = (MapSerializer)safeValueSerializer;
        TypeSerializer dupUserKeySerializer = serializer.getKeySerializer();
        TypeSerializer dupUserValueSerializer = serializer.getValueSerializer();
        RocksDBMapIterator iterator2 = new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, keyPrefixBytes, dupUserKeySerializer, dupUserValueSerializer){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
        if (!iterator2.hasNext()) {
            return null;
        }
        return KvStateSerializer.serializeMap(() -> iterator2, (TypeSerializer)dupUserKeySerializer, (TypeSerializer)dupUserValueSerializer);
    }

    private byte[] serializeCurrentKeyAndNamespace() throws IOException {
        this.writeCurrentKeyWithGroupAndNamespace();
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
        this.serializeCurrentKeyAndNamespace();
        this.userKeySerializer.serialize(userKey, this.keySerializationDataOutputView);
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserValue(UV userValue) throws IOException {
        return this.serializeUserValue(userValue, this.userValueSerializer);
    }

    private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
        return this.deserializeUserValue(rawValueBytes, this.userValueSerializer);
    }

    private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
        this.keySerializationStream.reset();
        if (userValue == null) {
            this.keySerializationDataOutputView.writeBoolean(true);
        } else {
            this.keySerializationDataOutputView.writeBoolean(false);
            valueSerializer.serialize(userValue, this.keySerializationDataOutputView);
        }
        return this.keySerializationStream.toByteArray();
    }

    private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
        ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)bais);
        in.skipBytes(userKeyOffset);
        return (UK)keySerializer.deserialize((DataInputView)in);
    }

    private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
        ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)bais);
        boolean isNull = in.readBoolean();
        return (UV)(isNull ? null : valueSerializer.deserialize((DataInputView)in));
    }

    private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
        if (rawKeyBytes.length < keyPrefixBytes.length) {
            return false;
        }
        int i = keyPrefixBytes.length;
        while (--i >= this.backend.getKeyGroupPrefixBytes()) {
            if (rawKeyBytes[i] == keyPrefixBytes[i]) continue;
            return false;
        }
        return true;
    }

    static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) {
        return (IS)new RocksDBMapState<K, N, UK, UV>((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), (Map)stateDesc.getDefaultValue(), backend);
    }

    static class StateSnapshotTransformerWrapper
    implements StateSnapshotTransformer<byte[]> {
        private static final byte[] NULL_VALUE;
        private static final byte NON_NULL_VALUE_PREFIX;
        private final StateSnapshotTransformer<byte[]> elementTransformer;
        private final ByteArrayDataInputView div;

        StateSnapshotTransformerWrapper(StateSnapshotTransformer<byte[]> originalTransformer) {
            this.elementTransformer = originalTransformer;
            this.div = new ByteArrayDataInputView();
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value2) {
            if (value2 == null || this.isNull(value2)) {
                return NULL_VALUE;
            }
            byte[] woNullByte = Arrays.copyOfRange(value2, 1, value2.length);
            byte[] filteredValue = (byte[])this.elementTransformer.filterOrTransform((Object)woNullByte);
            filteredValue = filteredValue == null ? NULL_VALUE : (filteredValue != woNullByte ? StateSnapshotTransformerWrapper.prependWithNonNullByte(filteredValue, value2) : value2);
            return filteredValue;
        }

        private boolean isNull(byte[] value2) {
            try {
                this.div.setData(value2, 0, 1);
                return this.div.readBoolean();
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to deserialize boolean flag of map user null value", (Throwable)e);
            }
        }

        private static byte[] prependWithNonNullByte(byte[] value2, byte[] reuse) {
            int len = 1 + value2.length;
            byte[] result2 = reuse.length == len ? reuse : new byte[len];
            result2[0] = NON_NULL_VALUE_PREFIX;
            System.arraycopy(value2, 0, result2, 1, value2.length);
            return result2;
        }

        static {
            ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1);
            try {
                dov.writeBoolean(true);
                NULL_VALUE = dov.toByteArray();
                dov.reset();
                dov.writeBoolean(false);
                NON_NULL_VALUE_PREFIX = dov.toByteArray()[0];
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize boolean flag of map user null value", (Throwable)e);
            }
        }
    }

    private abstract class RocksDBMapIterator<T>
    implements Iterator<T> {
        private static final int CACHE_SIZE_LIMIT = 128;
        private final RocksDB db;
        @Nonnull
        private final byte[] keyPrefixBytes;
        private boolean expired = false;
        private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList();
        private RocksDBMapEntry currentEntry;
        private int cacheIndex = 0;
        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;

        RocksDBMapIterator(RocksDB db, byte[] keyPrefixBytes, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
            this.db = db;
            this.keyPrefixBytes = keyPrefixBytes;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
        }

        @Override
        public boolean hasNext() {
            this.loadCache();
            return this.cacheIndex < this.cacheEntries.size();
        }

        @Override
        public void remove() {
            if (this.currentEntry == null || this.currentEntry.deleted) {
                throw new IllegalStateException("The remove operation must be called after a valid next operation.");
            }
            this.currentEntry.remove();
        }

        final RocksDBMapEntry nextEntry() {
            this.loadCache();
            if (this.cacheIndex == this.cacheEntries.size()) {
                if (!this.expired) {
                    throw new IllegalStateException();
                }
                return null;
            }
            this.currentEntry = this.cacheEntries.get(this.cacheIndex);
            ++this.cacheIndex;
            return this.currentEntry;
        }

        private void loadCache() {
            if (this.cacheIndex > this.cacheEntries.size()) {
                throw new IllegalStateException();
            }
            if (this.cacheIndex < this.cacheEntries.size() || this.expired) {
                return;
            }
            try (RocksIteratorWrapper iterator2 = RocksDBKeyedStateBackend.getRocksIterator(this.db, RocksDBMapState.this.columnFamily);){
                byte[] startBytes = this.currentEntry == null ? this.keyPrefixBytes : this.currentEntry.rawKeyBytes;
                this.cacheEntries.clear();
                this.cacheIndex = 0;
                iterator2.seek(startBytes);
                if (this.currentEntry != null && !this.currentEntry.deleted) {
                    iterator2.next();
                }
                while (true) {
                    if (!iterator2.isValid() || !RocksDBMapState.this.startWithKeyPrefix(this.keyPrefixBytes, iterator2.key())) {
                        this.expired = true;
                        break;
                    }
                    if (this.cacheEntries.size() >= 128) {
                        break;
                    }
                    RocksDBMapEntry entry = new RocksDBMapEntry(this.db, this.keyPrefixBytes.length, iterator2.key(), iterator2.value(), this.keySerializer, this.valueSerializer);
                    this.cacheEntries.add(entry);
                    iterator2.next();
                }
            }
        }
    }

    private class RocksDBMapEntry
    implements Map.Entry<UK, UV> {
        private final RocksDB db;
        private final byte[] rawKeyBytes;
        private byte[] rawValueBytes;
        private boolean deleted;
        private UK userKey;
        private UV userValue;
        private final int userKeyOffset;
        private TypeSerializer<UK> keySerializer;
        private TypeSerializer<UV> valueSerializer;

        RocksDBMapEntry(@Nonnegative RocksDB db, @Nonnull int userKeyOffset, @Nonnull byte[] rawKeyBytes, @Nonnull byte[] rawValueBytes, @Nonnull TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
            this.db = db;
            this.userKeyOffset = userKeyOffset;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
            this.rawKeyBytes = rawKeyBytes;
            this.rawValueBytes = rawValueBytes;
            this.deleted = false;
        }

        public void remove() {
            this.deleted = true;
            this.rawValueBytes = null;
            try {
                this.db.delete(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes);
            }
            catch (RocksDBException e) {
                throw new FlinkRuntimeException("Error while removing data from RocksDB.", (Throwable)e);
            }
        }

        @Override
        public UK getKey() {
            if (this.userKey == null) {
                try {
                    this.userKey = RocksDBMapState.this.deserializeUserKey(this.userKeyOffset, this.rawKeyBytes, this.keySerializer);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user key.", (Throwable)e);
                }
            }
            return this.userKey;
        }

        @Override
        public UV getValue() {
            if (this.deleted) {
                return null;
            }
            if (this.userValue == null) {
                try {
                    this.userValue = RocksDBMapState.this.deserializeUserValue(this.rawValueBytes, this.valueSerializer);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Error while deserializing the user value.", (Throwable)e);
                }
            }
            return this.userValue;
        }

        @Override
        public UV setValue(UV value2) {
            if (this.deleted) {
                throw new IllegalStateException("The value has already been deleted.");
            }
            Object oldValue = this.getValue();
            try {
                this.userValue = value2;
                this.rawValueBytes = RocksDBMapState.this.serializeUserValue(value2, this.valueSerializer);
                this.db.put(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes, this.rawValueBytes);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while putting data into RocksDB.", (Throwable)e);
            }
            return oldValue;
        }
    }
}

