Seongbae Chang created FLINK-20772:
-------------------------------------- Summary: [DISCUSS] RocksDBValueState with TTL occurs NullPointerException when calling update(null) method Key: FLINK-20772 URL: https://issues.apache.org/jira/browse/FLINK-20772 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.11.2 Environment: Flink version: 1.11.2 Flink Cluster: Standalone cluster with 3 Job managers and Task managers on CentOS 7 Reporter: Seongbae Chang h2. Problem * I use ValueState for my custom trigger and set TTL for these ValueState in RocksDB backend environment. * I found an error when I used this code. I know that ValueState.update(null) works equally to ValueState.clear() in general. Unfortunately, this error occurs after using TTL {code:java} // My Code ctx.getPartitionedState(batchTotalSizeStateDesc).update(null); {code} * I tested this in Flink 1.11.2, but I think it would be a problem in upper versions. * Plus, I'm a beginner. So, if there is any problem in this discussion issue, please give me advice about that. And I'll fix it! {code:java} // Error Stacktrace Caused by: TimerException{org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB} ... 12 more Caused by: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) at org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:102) at <MY-CLASS>.onProcessingTime(ActionBatchTimeTrigger.java:29) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onProcessingTime(WindowOperator.java:902) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1220) ... 11 more Caused by: java.lang.NullPointerException at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:69) at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:32) at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) ... 18 more {code} h2. Reason * It relates to RocksDBValueState with TTLValueState * In RocksDBValueState(as well as other types of ValueState), *.update(null)* has to be caught in if-clauses(null checking). However, it skips the null checking and then tries to serialize the null value. {code:java} // https://github.com/apache/flink/blob/release-1.11/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L96-L110 @Override public void update(V value) { if (value == null) { clear(); return; } try { backend.db.put(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace(), serializeValue(value)); } catch (Exception e) { throw new FlinkRuntimeException("Error while adding data to RocksDB", e); } }{code} * It is because that TtlValueState wraps the value(null) with the LastAccessTime and makes the new TtlValue Object with the null value. {code:java} // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L47-L51 @Override public void update(T value) throws IOException { accessCallback.run(); original.update(wrapWithTs(value)); } {code} {code:java} // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java#L46-L48 static <V> TtlValue<V> wrapWithTs(V value, long ts) { return new TtlValue<>(value, ts); }{code} {code:java} // https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java public class TtlValue<T> implements Serializable { private static final long serialVersionUID = 5221129704201125020L; @Nullable private final T userValue; private final long lastAccessTimestamp; public TtlValue(@Nullable T userValue, long lastAccessTimestamp) { this.userValue = userValue; this.lastAccessTimestamp = lastAccessTimestamp; } @Nullable public T getUserValue() { return userValue; } public long getLastAccessTimestamp() { return lastAccessTimestamp; } } {code} * In conclusion, I think that null checking logic has to be changed for checking whether userValue variable in TtlValue is null or not I hope that it would be helpful to improve Flink and if I have a chance, I want to fix it! Thank you and have a happy Christmas all! -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |