/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.runtime.state.KvState;

public class GenericReducingState<N, T, W extends ValueState<T> & KvState<N>>
implements ReducingState<T>,
KvState<N> {
    private final W wrappedState;
    private final ReduceFunction<T> reduceFunction;

    public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
        if (!(wrappedState instanceof KvState)) {
            throw new IllegalArgumentException("Wrapped state must be a KvState.");
        }
        this.wrappedState = wrappedState;
        this.reduceFunction = reduceFunction;
    }

    @Override
    public void setCurrentNamespace(N namespace) {
        ((KvState)this.wrappedState).setCurrentNamespace(namespace);
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
        return ((KvState)this.wrappedState).getSerializedValue(serializedKeyAndNamespace);
    }

    public T get() throws Exception {
        return (T)this.wrappedState.value();
    }

    public void add(T value) throws Exception {
        Object currentValue = this.wrappedState.value();
        if (currentValue == null) {
            this.wrappedState.update(value);
        } else {
            this.wrappedState.update(this.reduceFunction.reduce(currentValue, value));
        }
    }

    public void clear() {
        this.wrappedState.clear();
    }
}

