[jira] [Created] (FLINK-20772) [DISCUSS] RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20772) [DISCUSS] RocksDBValueState with TTL occurs NullPointerException when calling update(null) method

Shang Yuanchun (Jira)
Seongbae Chang created FLINK-20772:
--------------------------------------

             Summary: [DISCUSS] RocksDBValueState with TTL occurs NullPointerException when calling update(null) method
                 Key: FLINK-20772
                 URL: https://issues.apache.org/jira/browse/FLINK-20772
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.11.2
         Environment: Flink version: 1.11.2

Flink Cluster: Standalone cluster with 3 Job managers and Task managers on CentOS 7
            Reporter: Seongbae Chang


h2. Problem
 * I use ValueState for my custom trigger and set TTL for these ValueState in RocksDB backend environment.
 * I found an error when I used this code. I know that ValueState.update(null) works equally to ValueState.clear() in general. Unfortunately, this error occurs after using TTL

{code:java}
// My Code
ctx.getPartitionedState(batchTotalSizeStateDesc).update(null);
{code}
 * I tested this in Flink 1.11.2, but I think it would be a problem in upper versions.
 * Plus, I'm a beginner. So, if there is any problem in this discussion issue, please give me advice about that. And I'll fix it! 

{code:java}
// Error Stacktrace
Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB}
        ... 12 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
        at org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
        at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:102)
        at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:29)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220)
        ... 11 more
Caused by: java.lang.NullPointerException
        at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69)
        at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32)
        at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
        ... 18 more
{code}
 
h2. Reason
 * It relates to RocksDBValueState with TTLValueState
 * In RocksDBValueState(as well as other types of ValueState), *.update(null)* has to be caught in if-clauses(null checking). However, it skips the null checking and then tries to serialize the null value.

{code:java}
// https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110

@Override
public void update(V value) {
    if (value == null) {
        clear();
        return;
    }
 
    try {
        backend.db.put(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value));
    } catch (Exception e) {
        throw new FlinkRuntimeException("Error while adding data to RocksDB", e);      
    }
}{code}
 *  It is because that TtlValueState wraps the value(null) with the LastAccessTime and makes the new TtlValue Object with the null value.

{code:java}
// https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51

@Override
public void update(T value) throws IOException {
    accessCallback.run();
    original.update(wrapWithTs(value));
}
{code}
{code:java}
// https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java#L46-L48

static <V> TtlValue<V> wrapWithTs(V value, long ts) {
    return new TtlValue<>(value, ts);
}{code}
{code:java}
// https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java

public class TtlValue<T> implements Serializable {
    private static final long serialVersionUID = 5221129704201125020L;

    @Nullable
    private final T userValue;
    private final long lastAccessTimestamp;

    public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
        this.userValue = userValue;
        this.lastAccessTimestamp = lastAccessTimestamp;
    }

    @Nullable
    public T getUserValue() {
        return userValue;
    }

    public long getLastAccessTimestamp() {
        return lastAccessTimestamp;
    }
}
{code}
 * In conclusion, I think that null checking logic has to be changed for checking whether userValue variable in TtlValue is null or not

 

I hope that it would be helpful to improve Flink and if I have a chance, I want to fix it!

Thank you and have a happy Christmas all!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)