[jira] [Created] (FLINK-6061) NPE on TypeSerializer.serialize with a RocksDBStateBackend calling state.entries in the open() function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-6061) NPE on TypeSerializer.serialize with a RocksDBStateBackend calling state.entries in the open() function

Shang Yuanchun (Jira)
Vladislav Pernin created FLINK-6061:
---------------------------------------

             Summary: NPE on TypeSerializer.serialize with a RocksDBStateBackend calling state.entries in the open() function
                 Key: FLINK-6061
                 URL: https://issues.apache.org/jira/browse/FLINK-6061
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, State Backends, Checkpointing, Streaming
    Affects Versions: 1.3.0
            Reporter: Vladislav Pernin


With a default state (heap), the call to state.entries() "nicely fails" with a IllegalStateException :
{noformat}
Caused by: java.lang.IllegalStateException: No key set.
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
        at org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
        at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
        at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

With a RocksDBStateBackend, it fails with a NPE :
{noformat}
Caused by: java.lang.NullPointerException
        at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
        at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
        at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
        at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

The reason is that the record is null, because backend.getCurrentKey() is null (not yet set) in AbstractRocksDBState.
This may also be the case for other RockDBState implementations.

You can find the reproducer here based on 1.3-SNAPSHOT (needed for the MapState) :
https://github.com/vpernin/flink-rocksdbstate-npe

The reproducer is a non sense application. There is no MapState with TTL or expiration yet, so the goal is to try to shrink or expire the state at some interval.
This could be done by iterating over the entries of the state and removing some of them.

This could probably not be done in the open() method of a rich function.
I also tried to implement CheckpointListener and to access the state content in notifyCheckpointComplete() method, but it fails to, I guess due to the asynchronous nature of the checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-6061) NPE on TypeSerializer.serialize with a RocksDBStateBackend calling state.entries in the open() function

Vladislav Pernin
What would be the better / less dirty way of expiring a MapState or more
generally being able to access entries of a keyed state at some interval ?
The currentKey has to be set in the AbstractRocksDBState.

Incrementing a counter in the rich function and at some modulo, doing the
job does not really feel good.

2017-03-15 11:44 GMT+01:00 Vladislav Pernin (JIRA) <[hidden email]>:

> Vladislav Pernin created FLINK-6061:
> ---------------------------------------
>
>              Summary: NPE on TypeSerializer.serialize with a
> RocksDBStateBackend calling state.entries in the open() function
>                  Key: FLINK-6061
>                  URL: https://issues.apache.org/jira/browse/FLINK-6061
>              Project: Flink
>           Issue Type: Bug
>           Components: DataStream API, State Backends, Checkpointing,
> Streaming
>     Affects Versions: 1.3.0
>             Reporter: Vladislav Pernin
>
>
> With a default state (heap), the call to state.entries() "nicely fails"
> with a IllegalStateException :
> {noformat}
> Caused by: java.lang.IllegalStateException: No key set.
>         at org.apache.flink.util.Preconditions.checkState(
> Preconditions.java:195)
>         at org.apache.flink.runtime.state.heap.HeapMapState.
> entries(HeapMapState.java:188)
>         at org.apache.flink.runtime.state.UserFacingMapState.
> entries(UserFacingMapState.java:77)
>         at org.apache.flink.Reproducer$FailingMapWithState.open(
> Reproducer.java:78)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
>
> With a RocksDBStateBackend, it fails with a NPE :
> {noformat}
> Caused by: java.lang.NullPointerException
>         at org.apache.flink.api.common.typeutils.base.
> ShortSerializer.serialize(ShortSerializer.java:64)
>         at org.apache.flink.api.common.typeutils.base.
> ShortSerializer.serialize(ShortSerializer.java:27)
>         at org.apache.flink.contrib.streaming.state.
> AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
>         at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.
> writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
>         at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.
> writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
>         at org.apache.flink.contrib.streaming.state.RocksDBMapState.
> serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
>         at org.apache.flink.contrib.streaming.state.
> RocksDBMapState.iterator(RocksDBMapState.java:196)
>         at org.apache.flink.contrib.streaming.state.
> RocksDBMapState.entries(RocksDBMapState.java:143)
>         at org.apache.flink.runtime.state.UserFacingMapState.
> entries(UserFacingMapState.java:77)
>         at org.apache.flink.Reproducer$FailingMapWithState.open(
> Reproducer.java:78)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>         at java.lang.Thread.run(Thread.java:745)
> {noformat}
>
> The reason is that the record is null, because backend.getCurrentKey() is
> null (not yet set) in AbstractRocksDBState.
> This may also be the case for other RockDBState implementations.
>
> You can find the reproducer here based on 1.3-SNAPSHOT (needed for the
> MapState) :
> https://github.com/vpernin/flink-rocksdbstate-npe
>
> The reproducer is a non sense application. There is no MapState with TTL
> or expiration yet, so the goal is to try to shrink or expire the state at
> some interval.
> This could be done by iterating over the entries of the state and removing
> some of them.
>
> This could probably not be done in the open() method of a rich function.
> I also tried to implement CheckpointListener and to access the state
> content in notifyCheckpointComplete() method, but it fails to, I guess due
> to the asynchronous nature of the checkpoint.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.15#6346)
>