/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;

class RocksDBReducingState<K, N, V>
extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
implements InternalReducingState<K, N, V> {
    private final ReduceFunction<V> reduceFunction;

    private RocksDBReducingState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer, V defaultValue, ReduceFunction<V> reduceFunction, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.reduceFunction = reduceFunction;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<V> getValueSerializer() {
        return this.valueSerializer;
    }

    public V get() throws IOException {
        return (V)this.getInternal();
    }

    public void add(V value2) throws Exception {
        byte[] key = this.getKeyBytes();
        Object oldValue = this.getInternal(key);
        V newValue = oldValue == null ? value2 : this.reduceFunction.reduce(oldValue, value2);
        this.updateInternal(key, newValue);
    }

    public void mergeNamespaces(N target, Collection<N> sources2) throws Exception {
        if (sources2 == null || sources2.isEmpty()) {
            return;
        }
        Object key = this.backend.getCurrentKey();
        int keyGroup = this.backend.getCurrentKeyGroupIndex();
        try {
            Object current = null;
            for (N source2 : sources2) {
                if (source2 == null) continue;
                this.writeKeyWithGroupAndNamespace(keyGroup, key, source2, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] sourceKey = this.keySerializationStream.toByteArray();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                this.backend.db.delete(this.columnFamily, this.writeOptions, sourceKey);
                if (valueBytes == null) continue;
                Object value2 = this.valueSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStreamWithPos(valueBytes)));
                if (current != null) {
                    current = this.reduceFunction.reduce(current, value2);
                    continue;
                }
                current = value2;
            }
            if (current != null) {
                this.writeKeyWithGroupAndNamespace(keyGroup, key, target, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] targetKey = this.keySerializationStream.toByteArray();
                byte[] targetValueBytes = this.backend.db.get(this.columnFamily, targetKey);
                if (targetValueBytes != null) {
                    Object value3 = this.valueSerializer.deserialize((DataInputView)new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStreamWithPos(targetValueBytes)));
                    current = this.reduceFunction.reduce(current, value3);
                }
                this.keySerializationStream.reset();
                this.valueSerializer.serialize(current, this.keySerializationDataOutputView);
                this.backend.db.put(this.columnFamily, this.writeOptions, targetKey, this.keySerializationStream.toByteArray());
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", (Throwable)e);
        }
    }

    static <K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, RocksDBKeyedStateBackend<K> backend) {
        return (IS)new RocksDBReducingState<K, N, Object>((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), ((ReducingStateDescriptor)stateDesc).getReduceFunction(), backend);
    }
}

