/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.core.memory.ByteArrayDataInputView;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
implements InternalListState<K, N, V> {
    private final TypeSerializer<V> elementSerializer;
    private static final byte DELIMITER = 44;

    private RocksDBListState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<List<V>> valueSerializer, List<V> defaultValue, TypeSerializer<V> elementSerializer, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.elementSerializer = elementSerializer;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<List<V>> getValueSerializer() {
        return this.valueSerializer;
    }

    public Iterable<V> get() {
        return this.getInternal();
    }

    public List<V> getInternal() {
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            return RocksDBListState.deserializeList(valueBytes, this.elementSerializer);
        }
        catch (IOException | RocksDBException e) {
            throw new FlinkRuntimeException("Error while retrieving data from RocksDB", (Throwable)e);
        }
    }

    private static <V> List<V> deserializeList(byte[] valueBytes, TypeSerializer<V> elementSerializer) {
        V next2;
        if (valueBytes == null) {
            return null;
        }
        ByteArrayDataInputView in = new ByteArrayDataInputView(valueBytes);
        ArrayList<V> result2 = new ArrayList<V>();
        while ((next2 = RocksDBListState.deserializeNextElement((DataInputViewStreamWrapper)in, elementSerializer)) != null) {
            result2.add(next2);
        }
        return result2;
    }

    private static <V> V deserializeNextElement(DataInputViewStreamWrapper in, TypeSerializer<V> elementSerializer) {
        try {
            if (in.available() > 0) {
                Object element = elementSerializer.deserialize((DataInputView)in);
                if (in.available() > 0) {
                    in.readByte();
                }
                return (V)element;
            }
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Unexpected list element deserialization failure");
        }
        return null;
    }

    public void add(V value2) {
        Preconditions.checkNotNull(value2, (String)"You cannot add null to a ListState.");
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            this.keySerializationStream.reset();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)this.keySerializationStream);
            this.elementSerializer.serialize(value2, (DataOutputView)out);
            this.backend.db.merge(this.columnFamily, this.writeOptions, key, this.keySerializationStream.toByteArray());
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while adding data to RocksDB", (Throwable)e);
        }
    }

    public void mergeNamespaces(N target, Collection<N> sources2) {
        if (sources2 == null || sources2.isEmpty()) {
            return;
        }
        Object key = this.backend.getCurrentKey();
        int keyGroup = this.backend.getCurrentKeyGroupIndex();
        try {
            this.writeKeyWithGroupAndNamespace(keyGroup, key, target, this.keySerializationStream, this.keySerializationDataOutputView);
            byte[] targetKey = this.keySerializationStream.toByteArray();
            for (N source2 : sources2) {
                if (source2 == null) continue;
                this.writeKeyWithGroupAndNamespace(keyGroup, key, source2, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] sourceKey = this.keySerializationStream.toByteArray();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                this.backend.db.delete(this.columnFamily, this.writeOptions, sourceKey);
                if (valueBytes == null) continue;
                this.backend.db.merge(this.columnFamily, this.writeOptions, targetKey, valueBytes);
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", (Throwable)e);
        }
    }

    public void update(List<V> valueToStore) {
        this.updateInternal(valueToStore);
    }

    public void updateInternal(List<V> values2) {
        Preconditions.checkNotNull(values2, (String)"List of values to add cannot be null.");
        this.clear();
        if (!values2.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.keySerializationStream.toByteArray();
                byte[] premerge = RocksDBListState.getPreMergedValue(values2, this.elementSerializer, this.keySerializationStream);
                if (premerge == null) {
                    throw new IOException("Failed pre-merge values in update()");
                }
                this.backend.db.put(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", (Throwable)e);
            }
        }
    }

    public void addAll(List<V> values2) {
        Preconditions.checkNotNull(values2, (String)"List of values to add cannot be null.");
        if (!values2.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.keySerializationStream.toByteArray();
                byte[] premerge = RocksDBListState.getPreMergedValue(values2, this.elementSerializer, this.keySerializationStream);
                if (premerge == null) {
                    throw new IOException("Failed pre-merge values in addAll()");
                }
                this.backend.db.merge(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e) {
                throw new FlinkRuntimeException("Error while updating data to RocksDB", (Throwable)e);
            }
        }
    }

    private static <V> byte[] getPreMergedValue(List<V> values2, TypeSerializer<V> elementSerializer, ByteArrayOutputStreamWithPos keySerializationStream) throws IOException {
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)keySerializationStream);
        keySerializationStream.reset();
        boolean first2 = true;
        for (V value2 : values2) {
            Preconditions.checkNotNull(value2, (String)"You cannot add null to a ListState.");
            if (first2) {
                first2 = false;
            } else {
                keySerializationStream.write(44);
            }
            elementSerializer.serialize(value2, (DataOutputView)out);
        }
        return keySerializationStream.toByteArray();
    }

    static <E, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) {
        return (IS)new RocksDBListState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), (List)stateDesc.getDefaultValue(), ((ListStateDescriptor)stateDesc).getElementSerializer(), backend);
    }

    static class StateSnapshotTransformerWrapper<T>
    implements StateSnapshotTransformer<byte[]> {
        private final StateSnapshotTransformer<T> elementTransformer;
        private final TypeSerializer<T> elementSerializer;
        private final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(128);
        private final StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;

        StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
            this.elementTransformer = elementTransformer;
            this.elementSerializer = elementSerializer;
            this.transformStrategy = elementTransformer instanceof StateSnapshotTransformer.CollectionStateSnapshotTransformer ? ((StateSnapshotTransformer.CollectionStateSnapshotTransformer)elementTransformer).getFilterStrategy() : StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL;
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value2) {
            Object next2;
            if (value2 == null) {
                return null;
            }
            ArrayList<Object> result2 = new ArrayList<Object>();
            ByteArrayDataInputView in = new ByteArrayDataInputView(value2);
            int prevPosition = 0;
            while ((next2 = RocksDBListState.deserializeNextElement((DataInputViewStreamWrapper)in, this.elementSerializer)) != null) {
                Object transformedElement = this.elementTransformer.filterOrTransform(next2);
                if (transformedElement != null) {
                    if (this.transformStrategy == StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED) {
                        return Arrays.copyOfRange(value2, prevPosition, value2.length);
                    }
                    result2.add(transformedElement);
                }
                prevPosition = in.getPosition();
            }
            try {
                return result2.isEmpty() ? null : RocksDBListState.getPreMergedValue(result2, this.elementSerializer, this.out);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize transformed list", (Throwable)e);
            }
        }
    }
}

