[DISCUSS] FLIP-153: Support state access in Python DataStream API

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-153: Support state access in Python DataStream API

Shuiqiang Chen
Hi devs,

In FLIP-130, we have already supported Python DataStream stateless APIs so
that users are able to perform some basic data transformations. To
implement more complex data processing, we need to provide state access
support. So I would propose to add state access APIs in Python DataStream
API to support stateful operations on a KeyedStream. More details are in
the FLIP wiki page [1].

Any feedback will be highly appreciated!

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API

Best,
Shuiqiang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Xingbo Huang
Thanks Shuiqiang for starting this discussion.

Big +1 for this feature. State access support can further improve the
functionality of our existing Python DataStream.

I have 2 comments regarding to the design doc:

a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
instead of letting each implementation class hold `typeInfo` itself.For
example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
`ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.

b) I think we need to add the `MergingState` and `AppendingState`
interfaces, and then extract the `get` and `add` methods from `ListState`,
`AggregatingState`, and `ReducingState` into `AppendingState`. Then let
`ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.

Best,
Xingbo

Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:

> Hi devs,
>
> In FLIP-130, we have already supported Python DataStream stateless APIs so
> that users are able to perform some basic data transformations. To
> implement more complex data processing, we need to provide state access
> support. So I would propose to add state access APIs in Python DataStream
> API to support stateful operations on a KeyedStream. More details are in
> the FLIP wiki page [1].
>
> Any feedback will be highly appreciated!
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>
> Best,
> Shuiqiang
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Shuiqiang Chen
Hi Xingbo,

Thank you for your valuable suggestions.

Indeed, we need to provide clearer abstractions for StateDescriptor and State APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!

Best,
Shuiqiang

> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>
> Thanks Shuiqiang for starting this discussion.
>
> Big +1 for this feature. State access support can further improve the
> functionality of our existing Python DataStream.
>
> I have 2 comments regarding to the design doc:
>
> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
> instead of letting each implementation class hold `typeInfo` itself.For
> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>
> b) I think we need to add the `MergingState` and `AppendingState`
> interfaces, and then extract the `get` and `add` methods from `ListState`,
> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>
> Best,
> Xingbo
>
> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>
>> Hi devs,
>>
>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>> that users are able to perform some basic data transformations. To
>> implement more complex data processing, we need to provide state access
>> support. So I would propose to add state access APIs in Python DataStream
>> API to support stateful operations on a KeyedStream. More details are in
>> the FLIP wiki page [1].
>>
>> Any feedback will be highly appreciated!
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>
>> Best,
>> Shuiqiang
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Yun Tang
Hi Shuiqiang,

Thanks for driving this. I have several questions below:


  1.  Thread safety of state write-access. As you might know, state access is not thread-safe [1] in Flink, we depend on task single thread access. Since you change the state access to another async thread, can we still ensure this? It also includes not allow user to access state in its java operator along with the bundled python operator.
  2.  Number of keyed state backend per task. Flink would only have one keyed state-backend per operator and would only have one keyed state backend per operator chain (in the head operator if possible). However, once we use experimental features such as reinterpretAsKeyedStream [2], we could have two keyed state-backend in one operator chain within one task. Can python datastream API could handle this well?
  3.  Time to set current key. As we still need current key when registering timer [3], we need some place to hole the current key even not registered in keyed state backend.
  4.  State migration. Flink supports to migrate state automatically if new provided serializer is compatible with old serializer[4]. I'm afraid if python data stream API wraps user's serializer as BytePrimitiveArraySerializer, we will lose such functionality. Moreover, RocksDB will migrate state automatically on java side [5] and this will break if python related bytes involved.
  5.  Queryable state client. Currently, we only have java-based queryable state client [6], and we need another python-based queryable state client if involved python bytes.

[1] https://issues.apache.org/jira/browse/FLINK-13072
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[3] https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
[4] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
[5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
[6] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example

Best
Yun Tang


________________________________
From: Shuiqiang Chen <[hidden email]>
Sent: Wednesday, December 16, 2020 17:32
To: [hidden email] <[hidden email]>
Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Hi Xingbo,

Thank you for your valuable suggestions.

Indeed, we need to provide clearer abstractions for StateDescriptor and State APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!

Best,
Shuiqiang

> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>
> Thanks Shuiqiang for starting this discussion.
>
> Big +1 for this feature. State access support can further improve the
> functionality of our existing Python DataStream.
>
> I have 2 comments regarding to the design doc:
>
> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
> instead of letting each implementation class hold `typeInfo` itself.For
> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>
> b) I think we need to add the `MergingState` and `AppendingState`
> interfaces, and then extract the `get` and `add` methods from `ListState`,
> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>
> Best,
> Xingbo
>
> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>
>> Hi devs,
>>
>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>> that users are able to perform some basic data transformations. To
>> implement more complex data processing, we need to provide state access
>> support. So I would propose to add state access APIs in Python DataStream
>> API to support stateful operations on a KeyedStream. More details are in
>> the FLIP wiki page [1].
>>
>> Any feedback will be highly appreciated!
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>
>> Best,
>> Shuiqiang
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Shuiqiang Chen
Hi Yun,

Highly appreciate for your questions! I have the corresponding answers as bellow:

Re 1: You are right that the state access occurs in an async thread. However, all the state access will be synchrouzed in the Java operator and so there will be no concurrent access to the state backend.

Re 2: I think it could be handled well in Python DataStream API. In this case, there will be two operators and so also two keyed state backend.

Re 3: Sure, you are right. We will store the current key which may be used by the timer.

Re 4: Good point. State migration is still not covered in the current FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to this FLIP. I have updated the FLIP and added clear description for this.

Re 5: Good point. We may need to introduce a Python querable state client if we want to support Queryable state for Python operators. I'd like to cover it in a separate FLIP. I have updated the FLIP and add it as a future work.

Best,
Shuiqiang

> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
>
> Hi Shuiqiang,
>
> Thanks for driving this. I have several questions below:
>
>
>  1.  Thread safety of state write-access. As you might know, state access is not thread-safe [1] in Flink, we depend on task single thread access. Since you change the state access to another async thread, can we still ensure this? It also includes not allow user to access state in its java operator along with the bundled python operator.
>  2.  Number of keyed state backend per task. Flink would only have one keyed state-backend per operator and would only have one keyed state backend per operator chain (in the head operator if possible). However, once we use experimental features such as reinterpretAsKeyedStream [2], we could have two keyed state-backend in one operator chain within one task. Can python datastream API could handle this well?
>  3.  Time to set current key. As we still need current key when registering timer [3], we need some place to hole the current key even not registered in keyed state backend.
>  4.  State migration. Flink supports to migrate state automatically if new provided serializer is compatible with old serializer[4]. I'm afraid if python data stream API wraps user's serializer as BytePrimitiveArraySerializer, we will lose such functionality. Moreover, RocksDB will migrate state automatically on java side [5] and this will break if python related bytes involved.
>  5.  Queryable state client. Currently, we only have java-based queryable state client [6], and we need another python-based queryable state client if involved python bytes.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13072
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> [3] https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
> [4] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
> [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
> [6] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>
> Best
> Yun Tang
>
>
> ________________________________
> From: Shuiqiang Chen <[hidden email]>
> Sent: Wednesday, December 16, 2020 17:32
> To: [hidden email] <[hidden email]>
> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API
>
> Hi Xingbo,
>
> Thank you for your valuable suggestions.
>
> Indeed, we need to provide clearer abstractions for StateDescriptor and State APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!
>
> Best,
> Shuiqiang
>
>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>>
>> Thanks Shuiqiang for starting this discussion.
>>
>> Big +1 for this feature. State access support can further improve the
>> functionality of our existing Python DataStream.
>>
>> I have 2 comments regarding to the design doc:
>>
>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>> instead of letting each implementation class hold `typeInfo` itself.For
>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>>
>> b) I think we need to add the `MergingState` and `AppendingState`
>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>>
>> Best,
>> Xingbo
>>
>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>>
>>> Hi devs,
>>>
>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>> that users are able to perform some basic data transformations. To
>>> implement more complex data processing, we need to provide state access
>>> support. So I would propose to add state access APIs in Python DataStream
>>> API to support stateful operations on a KeyedStream. More details are in
>>> the FLIP wiki page [1].
>>>
>>> Any feedback will be highly appreciated!
>>>
>>> [1]
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>
>>> Best,
>>> Shuiqiang
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Wei Zhong-2
Hi Shuiqiang,

Thanks for driving this. +1 for this feature, just a minor comment to the design doc.

The interface of the `AppendingState` should be:

class AppendingState(State, Generic[IN, OUT]):

   @abstractmethod
   def get(self) -> OUT:
       pass

   @abstractmethod
   def add(self, value: IN) -> None:
       pass

The output type and the input type of the `AppendingState` maybe different. And the definition of the child classes should be:

class MergingState(AppendingState[IN, OUT]):
    pass


class ListState(MergingState[T, Iterable[T]]):

   @abstractmethod
   def update(self, values: List[T]) -> None:
       pass

   @abstractmethod
   def add_all(self, values: List[T]) -> None:
       pass

   def __iter__(self) -> Iterator[T]:
       return iter(self.get())

Best,
Wei

> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
>
> Hi Yun,
>
> Highly appreciate for your questions! I have the corresponding answers as bellow:
>
> Re 1: You are right that the state access occurs in an async thread. However, all the state access will be synchrouzed in the Java operator and so there will be no concurrent access to the state backend.
>
> Re 2: I think it could be handled well in Python DataStream API. In this case, there will be two operators and so also two keyed state backend.
>
> Re 3: Sure, you are right. We will store the current key which may be used by the timer.
>
> Re 4: Good point. State migration is still not covered in the current FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to this FLIP. I have updated the FLIP and added clear description for this.
>
> Re 5: Good point. We may need to introduce a Python querable state client if we want to support Queryable state for Python operators. I'd like to cover it in a separate FLIP. I have updated the FLIP and add it as a future work.
>
> Best,
> Shuiqiang
>
>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
>>
>> Hi Shuiqiang,
>>
>> Thanks for driving this. I have several questions below:
>>
>>
>> 1.  Thread safety of state write-access. As you might know, state access is not thread-safe [1] in Flink, we depend on task single thread access. Since you change the state access to another async thread, can we still ensure this? It also includes not allow user to access state in its java operator along with the bundled python operator.
>> 2.  Number of keyed state backend per task. Flink would only have one keyed state-backend per operator and would only have one keyed state backend per operator chain (in the head operator if possible). However, once we use experimental features such as reinterpretAsKeyedStream [2], we could have two keyed state-backend in one operator chain within one task. Can python datastream API could handle this well?
>> 3.  Time to set current key. As we still need current key when registering timer [3], we need some place to hole the current key even not registered in keyed state backend.
>> 4.  State migration. Flink supports to migrate state automatically if new provided serializer is compatible with old serializer[4]. I'm afraid if python data stream API wraps user's serializer as BytePrimitiveArraySerializer, we will lose such functionality. Moreover, RocksDB will migrate state automatically on java side [5] and this will break if python related bytes involved.
>> 5.  Queryable state client. Currently, we only have java-based queryable state client [6], and we need another python-based queryable state client if involved python bytes.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>> [3] https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>> [4] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>> [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>> [6] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>
>> Best
>> Yun Tang
>>
>>
>> ________________________________
>> From: Shuiqiang Chen <[hidden email]>
>> Sent: Wednesday, December 16, 2020 17:32
>> To: [hidden email] <[hidden email]>
>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API
>>
>> Hi Xingbo,
>>
>> Thank you for your valuable suggestions.
>>
>> Indeed, we need to provide clearer abstractions for StateDescriptor and State APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!
>>
>> Best,
>> Shuiqiang
>>
>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>>>
>>> Thanks Shuiqiang for starting this discussion.
>>>
>>> Big +1 for this feature. State access support can further improve the
>>> functionality of our existing Python DataStream.
>>>
>>> I have 2 comments regarding to the design doc:
>>>
>>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>>> instead of letting each implementation class hold `typeInfo` itself.For
>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>>> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>>>
>>> b) I think we need to add the `MergingState` and `AppendingState`
>>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>>>
>>> Best,
>>> Xingbo
>>>
>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>>>
>>>> Hi devs,
>>>>
>>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>>> that users are able to perform some basic data transformations. To
>>>> implement more complex data processing, we need to provide state access
>>>> support. So I would propose to add state access APIs in Python DataStream
>>>> API to support stateful operations on a KeyedStream. More details are in
>>>> the FLIP wiki page [1].
>>>>
>>>> Any feedback will be highly appreciated!
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Shuiqiang Chen
Hi wei,

Big thanks for pointing out the mistakes! I have updated the FLIP according to your suggestions.

Best,
Shuiqiang

> 在 2020年12月18日,下午2:37,Wei Zhong <[hidden email]> 写道:
>
> Hi Shuiqiang,
>
> Thanks for driving this. +1 for this feature, just a minor comment to the design doc.
>
> The interface of the `AppendingState` should be:
>
> class AppendingState(State, Generic[IN, OUT]):
>
>   @abstractmethod
>   def get(self) -> OUT:
>       pass
>
>   @abstractmethod
>   def add(self, value: IN) -> None:
>       pass
>
> The output type and the input type of the `AppendingState` maybe different. And the definition of the child classes should be:
>
> class MergingState(AppendingState[IN, OUT]):
>    pass
>
>
> class ListState(MergingState[T, Iterable[T]]):
>
>   @abstractmethod
>   def update(self, values: List[T]) -> None:
>       pass
>
>   @abstractmethod
>   def add_all(self, values: List[T]) -> None:
>       pass
>
>   def __iter__(self) -> Iterator[T]:
>       return iter(self.get())
>
> Best,
> Wei
>
>> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
>>
>> Hi Yun,
>>
>> Highly appreciate for your questions! I have the corresponding answers as bellow:
>>
>> Re 1: You are right that the state access occurs in an async thread. However, all the state access will be synchrouzed in the Java operator and so there will be no concurrent access to the state backend.
>>
>> Re 2: I think it could be handled well in Python DataStream API. In this case, there will be two operators and so also two keyed state backend.
>>
>> Re 3: Sure, you are right. We will store the current key which may be used by the timer.
>>
>> Re 4: Good point. State migration is still not covered in the current FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to this FLIP. I have updated the FLIP and added clear description for this.
>>
>> Re 5: Good point. We may need to introduce a Python querable state client if we want to support Queryable state for Python operators. I'd like to cover it in a separate FLIP. I have updated the FLIP and add it as a future work.
>>
>> Best,
>> Shuiqiang
>>
>>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
>>>
>>> Hi Shuiqiang,
>>>
>>> Thanks for driving this. I have several questions below:
>>>
>>>
>>> 1.  Thread safety of state write-access. As you might know, state access is not thread-safe [1] in Flink, we depend on task single thread access. Since you change the state access to another async thread, can we still ensure this? It also includes not allow user to access state in its java operator along with the bundled python operator.
>>> 2.  Number of keyed state backend per task. Flink would only have one keyed state-backend per operator and would only have one keyed state backend per operator chain (in the head operator if possible). However, once we use experimental features such as reinterpretAsKeyedStream [2], we could have two keyed state-backend in one operator chain within one task. Can python datastream API could handle this well?
>>> 3.  Time to set current key. As we still need current key when registering timer [3], we need some place to hole the current key even not registered in keyed state backend.
>>> 4.  State migration. Flink supports to migrate state automatically if new provided serializer is compatible with old serializer[4]. I'm afraid if python data stream API wraps user's serializer as BytePrimitiveArraySerializer, we will lose such functionality. Moreover, RocksDB will migrate state automatically on java side [5] and this will break if python related bytes involved.
>>> 5.  Queryable state client. Currently, we only have java-based queryable state client [6], and we need another python-based queryable state client if involved python bytes.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>> [3] https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>>> [4] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>>> [5] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>>> [6] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> ________________________________
>>> From: Shuiqiang Chen <[hidden email]>
>>> Sent: Wednesday, December 16, 2020 17:32
>>> To: [hidden email] <[hidden email]>
>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API
>>>
>>> Hi Xingbo,
>>>
>>> Thank you for your valuable suggestions.
>>>
>>> Indeed, we need to provide clearer abstractions for StateDescriptor and State APIs, I have updated the FLIP accordingly. Looking forward to your feedbacks!
>>>
>>> Best,
>>> Shuiqiang
>>>
>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>>>>
>>>> Thanks Shuiqiang for starting this discussion.
>>>>
>>>> Big +1 for this feature. State access support can further improve the
>>>> functionality of our existing Python DataStream.
>>>>
>>>> I have 2 comments regarding to the design doc:
>>>>
>>>> a) I think that `StateDescriptor` needs to hold the variable `typeInfo`
>>>> instead of letting each implementation class hold `typeInfo` itself.For
>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but passes
>>>> `ListTypeInfo(elem_type_info)` to the construct method of `StateDescriptor`.
>>>>
>>>> b) I think we need to add the `MergingState` and `AppendingState`
>>>> interfaces, and then extract the `get` and `add` methods from `ListState`,
>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then let
>>>> `ListState`, `AggregatingState` and `ReducingState` inherit `MergingState`.
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>>>>
>>>>> Hi devs,
>>>>>
>>>>> In FLIP-130, we have already supported Python DataStream stateless APIs so
>>>>> that users are able to perform some basic data transformations. To
>>>>> implement more complex data processing, we need to provide state access
>>>>> support. So I would propose to add state access APIs in Python DataStream
>>>>> API to support stateful operations on a KeyedStream. More details are in
>>>>> the FLIP wiki page [1].
>>>>>
>>>>> Any feedback will be highly appreciated!
>>>>>
>>>>> [1]
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Yu Li
Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.

*bq. However, all the state access will be synchronized in the Java
operator and so there will be no concurrent access to the state backend.*
Could you add a section to explicitly mention this in the FLIP document? I
think single-threaded state access is an important prerequisite and it's
important for later contributors to know about this clearly, from both the
design doc and source codes.

The other parts LGTM, added some minor inline comments in the FLIP, please
take a look.

Thanks.

Best Regards,
Yu


On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <[hidden email]> wrote:

> Hi wei,
>
> Big thanks for pointing out the mistakes! I have updated the FLIP
> according to your suggestions.
>
> Best,
> Shuiqiang
>
> > 在 2020年12月18日,下午2:37,Wei Zhong <[hidden email]> 写道:
> >
> > Hi Shuiqiang,
> >
> > Thanks for driving this. +1 for this feature, just a minor comment to
> the design doc.
> >
> > The interface of the `AppendingState` should be:
> >
> > class AppendingState(State, Generic[IN, OUT]):
> >
> >   @abstractmethod
> >   def get(self) -> OUT:
> >       pass
> >
> >   @abstractmethod
> >   def add(self, value: IN) -> None:
> >       pass
> >
> > The output type and the input type of the `AppendingState` maybe
> different. And the definition of the child classes should be:
> >
> > class MergingState(AppendingState[IN, OUT]):
> >    pass
> >
> >
> > class ListState(MergingState[T, Iterable[T]]):
> >
> >   @abstractmethod
> >   def update(self, values: List[T]) -> None:
> >       pass
> >
> >   @abstractmethod
> >   def add_all(self, values: List[T]) -> None:
> >       pass
> >
> >   def __iter__(self) -> Iterator[T]:
> >       return iter(self.get())
> >
> > Best,
> > Wei
> >
> >> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
> >>
> >> Hi Yun,
> >>
> >> Highly appreciate for your questions! I have the corresponding answers
> as bellow:
> >>
> >> Re 1: You are right that the state access occurs in an async thread.
> However, all the state access will be synchrouzed in the Java operator and
> so there will be no concurrent access to the state backend.
> >>
> >> Re 2: I think it could be handled well in Python DataStream API. In
> this case, there will be two operators and so also two keyed state backend.
> >>
> >> Re 3: Sure, you are right. We will store the current key which may be
> used by the timer.
> >>
> >> Re 4: Good point. State migration is still not covered in the current
> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to
> this FLIP. I have updated the FLIP and added clear description for this.
> >>
> >> Re 5: Good point. We may need to introduce a Python querable state
> client if we want to support Queryable state for Python operators. I'd like
> to cover it in a separate FLIP. I have updated the FLIP and add it as a
> future work.
> >>
> >> Best,
> >> Shuiqiang
> >>
> >>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
> >>>
> >>> Hi Shuiqiang,
> >>>
> >>> Thanks for driving this. I have several questions below:
> >>>
> >>>
> >>> 1.  Thread safety of state write-access. As you might know, state
> access is not thread-safe [1] in Flink, we depend on task single thread
> access. Since you change the state access to another async thread, can we
> still ensure this? It also includes not allow user to access state in its
> java operator along with the bundled python operator.
> >>> 2.  Number of keyed state backend per task. Flink would only have one
> keyed state-backend per operator and would only have one keyed state
> backend per operator chain (in the head operator if possible). However,
> once we use experimental features such as reinterpretAsKeyedStream [2], we
> could have two keyed state-backend in one operator chain within one task.
> Can python datastream API could handle this well?
> >>> 3.  Time to set current key. As we still need current key when
> registering timer [3], we need some place to hole the current key even not
> registered in keyed state backend.
> >>> 4.  State migration. Flink supports to migrate state automatically if
> new provided serializer is compatible with old serializer[4]. I'm afraid if
> python data stream API wraps user's serializer as
> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
> RocksDB will migrate state automatically on java side [5] and this will
> break if python related bytes involved.
> >>> 5.  Queryable state client. Currently, we only have java-based
> queryable state client [6], and we need another python-based queryable
> state client if involved python bytes.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-13072
> >>> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> >>> [3]
> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
> >>> [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
> >>> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
> >>> [6]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
> >>>
> >>> Best
> >>> Yun Tang
> >>>
> >>>
> >>> ________________________________
> >>> From: Shuiqiang Chen <[hidden email]>
> >>> Sent: Wednesday, December 16, 2020 17:32
> >>> To: [hidden email] <[hidden email]>
> >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
> DataStream API
> >>>
> >>> Hi Xingbo,
> >>>
> >>> Thank you for your valuable suggestions.
> >>>
> >>> Indeed, we need to provide clearer abstractions for StateDescriptor
> and State APIs, I have updated the FLIP accordingly. Looking forward to
> your feedbacks!
> >>>
> >>> Best,
> >>> Shuiqiang
> >>>
> >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
> >>>>
> >>>> Thanks Shuiqiang for starting this discussion.
> >>>>
> >>>> Big +1 for this feature. State access support can further improve the
> >>>> functionality of our existing Python DataStream.
> >>>>
> >>>> I have 2 comments regarding to the design doc:
> >>>>
> >>>> a) I think that `StateDescriptor` needs to hold the variable
> `typeInfo`
> >>>> instead of letting each implementation class hold `typeInfo`
> itself.For
> >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
> passes
> >>>> `ListTypeInfo(elem_type_info)` to the construct method of
> `StateDescriptor`.
> >>>>
> >>>> b) I think we need to add the `MergingState` and `AppendingState`
> >>>> interfaces, and then extract the `get` and `add` methods from
> `ListState`,
> >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
> let
> >>>> `ListState`, `AggregatingState` and `ReducingState` inherit
> `MergingState`.
> >>>>
> >>>> Best,
> >>>> Xingbo
> >>>>
> >>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
> >>>>
> >>>>> Hi devs,
> >>>>>
> >>>>> In FLIP-130, we have already supported Python DataStream stateless
> APIs so
> >>>>> that users are able to perform some basic data transformations. To
> >>>>> implement more complex data processing, we need to provide state
> access
> >>>>> support. So I would propose to add state access APIs in Python
> DataStream
> >>>>> API to support stateful operations on a KeyedStream. More details
> are in
> >>>>> the FLIP wiki page [1].
> >>>>>
> >>>>> Any feedback will be highly appreciated!
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> >>>>>
> >>>>> Best,
> >>>>> Shuiqiang
> >>>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Shuiqiang Chen
Hi Yu,

Thanks a lot for your suggestions.

I have addressed your inlined comments in the FLIP and also added a new
section "State backed access synchronization" that explains the way to make
sure there is no concurrent access to the state backend. Please have a look.

Best,
Shuiqiang


Yu Li <[hidden email]> 于2021年1月4日周一 下午4:15写道:

> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>
> *bq. However, all the state access will be synchronized in the Java
> operator and so there will be no concurrent access to the state backend.*
> Could you add a section to explicitly mention this in the FLIP document? I
> think single-threaded state access is an important prerequisite and it's
> important for later contributors to know about this clearly, from both the
> design doc and source codes.
>
> The other parts LGTM, added some minor inline comments in the FLIP, please
> take a look.
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <[hidden email]> wrote:
>
> > Hi wei,
> >
> > Big thanks for pointing out the mistakes! I have updated the FLIP
> > according to your suggestions.
> >
> > Best,
> > Shuiqiang
> >
> > > 在 2020年12月18日,下午2:37,Wei Zhong <[hidden email]> 写道:
> > >
> > > Hi Shuiqiang,
> > >
> > > Thanks for driving this. +1 for this feature, just a minor comment to
> > the design doc.
> > >
> > > The interface of the `AppendingState` should be:
> > >
> > > class AppendingState(State, Generic[IN, OUT]):
> > >
> > >   @abstractmethod
> > >   def get(self) -> OUT:
> > >       pass
> > >
> > >   @abstractmethod
> > >   def add(self, value: IN) -> None:
> > >       pass
> > >
> > > The output type and the input type of the `AppendingState` maybe
> > different. And the definition of the child classes should be:
> > >
> > > class MergingState(AppendingState[IN, OUT]):
> > >    pass
> > >
> > >
> > > class ListState(MergingState[T, Iterable[T]]):
> > >
> > >   @abstractmethod
> > >   def update(self, values: List[T]) -> None:
> > >       pass
> > >
> > >   @abstractmethod
> > >   def add_all(self, values: List[T]) -> None:
> > >       pass
> > >
> > >   def __iter__(self) -> Iterator[T]:
> > >       return iter(self.get())
> > >
> > > Best,
> > > Wei
> > >
> > >> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
> > >>
> > >> Hi Yun,
> > >>
> > >> Highly appreciate for your questions! I have the corresponding answers
> > as bellow:
> > >>
> > >> Re 1: You are right that the state access occurs in an async thread.
> > However, all the state access will be synchrouzed in the Java operator
> and
> > so there will be no concurrent access to the state backend.
> > >>
> > >> Re 2: I think it could be handled well in Python DataStream API. In
> > this case, there will be two operators and so also two keyed state
> backend.
> > >>
> > >> Re 3: Sure, you are right. We will store the current key which may be
> > used by the timer.
> > >>
> > >> Re 4: Good point. State migration is still not covered in the current
> > FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
> to
> > this FLIP. I have updated the FLIP and added clear description for this.
> > >>
> > >> Re 5: Good point. We may need to introduce a Python querable state
> > client if we want to support Queryable state for Python operators. I'd
> like
> > to cover it in a separate FLIP. I have updated the FLIP and add it as a
> > future work.
> > >>
> > >> Best,
> > >> Shuiqiang
> > >>
> > >>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
> > >>>
> > >>> Hi Shuiqiang,
> > >>>
> > >>> Thanks for driving this. I have several questions below:
> > >>>
> > >>>
> > >>> 1.  Thread safety of state write-access. As you might know, state
> > access is not thread-safe [1] in Flink, we depend on task single thread
> > access. Since you change the state access to another async thread, can we
> > still ensure this? It also includes not allow user to access state in its
> > java operator along with the bundled python operator.
> > >>> 2.  Number of keyed state backend per task. Flink would only have one
> > keyed state-backend per operator and would only have one keyed state
> > backend per operator chain (in the head operator if possible). However,
> > once we use experimental features such as reinterpretAsKeyedStream [2],
> we
> > could have two keyed state-backend in one operator chain within one task.
> > Can python datastream API could handle this well?
> > >>> 3.  Time to set current key. As we still need current key when
> > registering timer [3], we need some place to hole the current key even
> not
> > registered in keyed state backend.
> > >>> 4.  State migration. Flink supports to migrate state automatically if
> > new provided serializer is compatible with old serializer[4]. I'm afraid
> if
> > python data stream API wraps user's serializer as
> > BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
> > RocksDB will migrate state automatically on java side [5] and this will
> > break if python related bytes involved.
> > >>> 5.  Queryable state client. Currently, we only have java-based
> > queryable state client [6], and we need another python-based queryable
> > state client if involved python bytes.
> > >>>
> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13072
> > >>> [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> > >>> [3]
> >
> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
> > >>> [4]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
> > >>> [5]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
> > >>> [6]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
> > >>>
> > >>> Best
> > >>> Yun Tang
> > >>>
> > >>>
> > >>> ________________________________
> > >>> From: Shuiqiang Chen <[hidden email]>
> > >>> Sent: Wednesday, December 16, 2020 17:32
> > >>> To: [hidden email] <[hidden email]>
> > >>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
> > DataStream API
> > >>>
> > >>> Hi Xingbo,
> > >>>
> > >>> Thank you for your valuable suggestions.
> > >>>
> > >>> Indeed, we need to provide clearer abstractions for StateDescriptor
> > and State APIs, I have updated the FLIP accordingly. Looking forward to
> > your feedbacks!
> > >>>
> > >>> Best,
> > >>> Shuiqiang
> > >>>
> > >>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
> > >>>>
> > >>>> Thanks Shuiqiang for starting this discussion.
> > >>>>
> > >>>> Big +1 for this feature. State access support can further improve
> the
> > >>>> functionality of our existing Python DataStream.
> > >>>>
> > >>>> I have 2 comments regarding to the design doc:
> > >>>>
> > >>>> a) I think that `StateDescriptor` needs to hold the variable
> > `typeInfo`
> > >>>> instead of letting each implementation class hold `typeInfo`
> > itself.For
> > >>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
> > passes
> > >>>> `ListTypeInfo(elem_type_info)` to the construct method of
> > `StateDescriptor`.
> > >>>>
> > >>>> b) I think we need to add the `MergingState` and `AppendingState`
> > >>>> interfaces, and then extract the `get` and `add` methods from
> > `ListState`,
> > >>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
> > let
> > >>>> `ListState`, `AggregatingState` and `ReducingState` inherit
> > `MergingState`.
> > >>>>
> > >>>> Best,
> > >>>> Xingbo
> > >>>>
> > >>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
> > >>>>
> > >>>>> Hi devs,
> > >>>>>
> > >>>>> In FLIP-130, we have already supported Python DataStream stateless
> > APIs so
> > >>>>> that users are able to perform some basic data transformations. To
> > >>>>> implement more complex data processing, we need to provide state
> > access
> > >>>>> support. So I would propose to add state access APIs in Python
> > DataStream
> > >>>>> API to support stateful operations on a KeyedStream. More details
> > are in
> > >>>>> the FLIP wiki page [1].
> > >>>>>
> > >>>>> Any feedback will be highly appreciated!
> > >>>>>
> > >>>>> [1]
> > >>>>>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
> > >>>>>
> > >>>>> Best,
> > >>>>> Shuiqiang
> > >>>>>
> > >>>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Dian Fu-2
Thanks Shuiqiang for driving this.

The design looks good to me. +1 to start the vote if there are no more comments.

Regards,
Dian

> 在 2021年1月4日,下午7:40,Shuiqiang Chen <[hidden email]> 写道:
>
> Hi Yu,
>
> Thanks a lot for your suggestions.
>
> I have addressed your inlined comments in the FLIP and also added a new
> section "State backed access synchronization" that explains the way to make
> sure there is no concurrent access to the state backend. Please have a look.
>
> Best,
> Shuiqiang
>
>
> Yu Li <[hidden email]> 于2021年1月4日周一 下午4:15写道:
>
>> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>>
>> *bq. However, all the state access will be synchronized in the Java
>> operator and so there will be no concurrent access to the state backend.*
>> Could you add a section to explicitly mention this in the FLIP document? I
>> think single-threaded state access is an important prerequisite and it's
>> important for later contributors to know about this clearly, from both the
>> design doc and source codes.
>>
>> The other parts LGTM, added some minor inline comments in the FLIP, please
>> take a look.
>>
>> Thanks.
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <[hidden email]> wrote:
>>
>>> Hi wei,
>>>
>>> Big thanks for pointing out the mistakes! I have updated the FLIP
>>> according to your suggestions.
>>>
>>> Best,
>>> Shuiqiang
>>>
>>>> 在 2020年12月18日,下午2:37,Wei Zhong <[hidden email]> 写道:
>>>>
>>>> Hi Shuiqiang,
>>>>
>>>> Thanks for driving this. +1 for this feature, just a minor comment to
>>> the design doc.
>>>>
>>>> The interface of the `AppendingState` should be:
>>>>
>>>> class AppendingState(State, Generic[IN, OUT]):
>>>>
>>>>  @abstractmethod
>>>>  def get(self) -> OUT:
>>>>      pass
>>>>
>>>>  @abstractmethod
>>>>  def add(self, value: IN) -> None:
>>>>      pass
>>>>
>>>> The output type and the input type of the `AppendingState` maybe
>>> different. And the definition of the child classes should be:
>>>>
>>>> class MergingState(AppendingState[IN, OUT]):
>>>>   pass
>>>>
>>>>
>>>> class ListState(MergingState[T, Iterable[T]]):
>>>>
>>>>  @abstractmethod
>>>>  def update(self, values: List[T]) -> None:
>>>>      pass
>>>>
>>>>  @abstractmethod
>>>>  def add_all(self, values: List[T]) -> None:
>>>>      pass
>>>>
>>>>  def __iter__(self) -> Iterator[T]:
>>>>      return iter(self.get())
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>>> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
>>>>>
>>>>> Hi Yun,
>>>>>
>>>>> Highly appreciate for your questions! I have the corresponding answers
>>> as bellow:
>>>>>
>>>>> Re 1: You are right that the state access occurs in an async thread.
>>> However, all the state access will be synchrouzed in the Java operator
>> and
>>> so there will be no concurrent access to the state backend.
>>>>>
>>>>> Re 2: I think it could be handled well in Python DataStream API. In
>>> this case, there will be two operators and so also two keyed state
>> backend.
>>>>>
>>>>> Re 3: Sure, you are right. We will store the current key which may be
>>> used by the timer.
>>>>>
>>>>> Re 4: Good point. State migration is still not covered in the current
>>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
>> to
>>> this FLIP. I have updated the FLIP and added clear description for this.
>>>>>
>>>>> Re 5: Good point. We may need to introduce a Python querable state
>>> client if we want to support Queryable state for Python operators. I'd
>> like
>>> to cover it in a separate FLIP. I have updated the FLIP and add it as a
>>> future work.
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
>>>>>>
>>>>>> Hi Shuiqiang,
>>>>>>
>>>>>> Thanks for driving this. I have several questions below:
>>>>>>
>>>>>>
>>>>>> 1.  Thread safety of state write-access. As you might know, state
>>> access is not thread-safe [1] in Flink, we depend on task single thread
>>> access. Since you change the state access to another async thread, can we
>>> still ensure this? It also includes not allow user to access state in its
>>> java operator along with the bundled python operator.
>>>>>> 2.  Number of keyed state backend per task. Flink would only have one
>>> keyed state-backend per operator and would only have one keyed state
>>> backend per operator chain (in the head operator if possible). However,
>>> once we use experimental features such as reinterpretAsKeyedStream [2],
>> we
>>> could have two keyed state-backend in one operator chain within one task.
>>> Can python datastream API could handle this well?
>>>>>> 3.  Time to set current key. As we still need current key when
>>> registering timer [3], we need some place to hole the current key even
>> not
>>> registered in keyed state backend.
>>>>>> 4.  State migration. Flink supports to migrate state automatically if
>>> new provided serializer is compatible with old serializer[4]. I'm afraid
>> if
>>> python data stream API wraps user's serializer as
>>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
>>> RocksDB will migrate state automatically on java side [5] and this will
>>> break if python related bytes involved.
>>>>>> 5.  Queryable state client. Currently, we only have java-based
>>> queryable state client [6], and we need another python-based queryable
>>> state client if involved python bytes.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>>>>>> [2]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>>>> [3]
>>>
>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>>>>>> [4]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>>>>>> [5]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>>>>>> [6]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>>>>>
>>>>>> Best
>>>>>> Yun Tang
>>>>>>
>>>>>>
>>>>>> ________________________________
>>>>>> From: Shuiqiang Chen <[hidden email]>
>>>>>> Sent: Wednesday, December 16, 2020 17:32
>>>>>> To: [hidden email] <[hidden email]>
>>>>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
>>> DataStream API
>>>>>>
>>>>>> Hi Xingbo,
>>>>>>
>>>>>> Thank you for your valuable suggestions.
>>>>>>
>>>>>> Indeed, we need to provide clearer abstractions for StateDescriptor
>>> and State APIs, I have updated the FLIP accordingly. Looking forward to
>>> your feedbacks!
>>>>>>
>>>>>> Best,
>>>>>> Shuiqiang
>>>>>>
>>>>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>>>>>>>
>>>>>>> Thanks Shuiqiang for starting this discussion.
>>>>>>>
>>>>>>> Big +1 for this feature. State access support can further improve
>> the
>>>>>>> functionality of our existing Python DataStream.
>>>>>>>
>>>>>>> I have 2 comments regarding to the design doc:
>>>>>>>
>>>>>>> a) I think that `StateDescriptor` needs to hold the variable
>>> `typeInfo`
>>>>>>> instead of letting each implementation class hold `typeInfo`
>>> itself.For
>>>>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
>>> passes
>>>>>>> `ListTypeInfo(elem_type_info)` to the construct method of
>>> `StateDescriptor`.
>>>>>>>
>>>>>>> b) I think we need to add the `MergingState` and `AppendingState`
>>>>>>> interfaces, and then extract the `get` and `add` methods from
>>> `ListState`,
>>>>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
>>> let
>>>>>>> `ListState`, `AggregatingState` and `ReducingState` inherit
>>> `MergingState`.
>>>>>>>
>>>>>>> Best,
>>>>>>> Xingbo
>>>>>>>
>>>>>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>>>>>>>
>>>>>>>> Hi devs,
>>>>>>>>
>>>>>>>> In FLIP-130, we have already supported Python DataStream stateless
>>> APIs so
>>>>>>>> that users are able to perform some basic data transformations. To
>>>>>>>> implement more complex data processing, we need to provide state
>>> access
>>>>>>>> support. So I would propose to add state access APIs in Python
>>> DataStream
>>>>>>>> API to support stateful operations on a KeyedStream. More details
>>> are in
>>>>>>>> the FLIP wiki page [1].
>>>>>>>>
>>>>>>>> Any feedback will be highly appreciated!
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Shuiqiang
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Yun Tang
The design looks good to me now. +1 to start the vote if there are no more comments..

Best
Yun Tang
________________________________
From: Dian Fu <[hidden email]>
Sent: Tuesday, January 5, 2021 13:32
To: [hidden email] <[hidden email]>
Subject: Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

Thanks Shuiqiang for driving this.

The design looks good to me. +1 to start the vote if there are no more comments.

Regards,
Dian

> 在 2021年1月4日,下午7:40,Shuiqiang Chen <[hidden email]> 写道:
>
> Hi Yu,
>
> Thanks a lot for your suggestions.
>
> I have addressed your inlined comments in the FLIP and also added a new
> section "State backed access synchronization" that explains the way to make
> sure there is no concurrent access to the state backend. Please have a look.
>
> Best,
> Shuiqiang
>
>
> Yu Li <[hidden email]> 于2021年1月4日周一 下午4:15写道:
>
>> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>>
>> *bq. However, all the state access will be synchronized in the Java
>> operator and so there will be no concurrent access to the state backend.*
>> Could you add a section to explicitly mention this in the FLIP document? I
>> think single-threaded state access is an important prerequisite and it's
>> important for later contributors to know about this clearly, from both the
>> design doc and source codes.
>>
>> The other parts LGTM, added some minor inline comments in the FLIP, please
>> take a look.
>>
>> Thanks.
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen <[hidden email]> wrote:
>>
>>> Hi wei,
>>>
>>> Big thanks for pointing out the mistakes! I have updated the FLIP
>>> according to your suggestions.
>>>
>>> Best,
>>> Shuiqiang
>>>
>>>> 在 2020年12月18日,下午2:37,Wei Zhong <[hidden email]> 写道:
>>>>
>>>> Hi Shuiqiang,
>>>>
>>>> Thanks for driving this. +1 for this feature, just a minor comment to
>>> the design doc.
>>>>
>>>> The interface of the `AppendingState` should be:
>>>>
>>>> class AppendingState(State, Generic[IN, OUT]):
>>>>
>>>>  @abstractmethod
>>>>  def get(self) -> OUT:
>>>>      pass
>>>>
>>>>  @abstractmethod
>>>>  def add(self, value: IN) -> None:
>>>>      pass
>>>>
>>>> The output type and the input type of the `AppendingState` maybe
>>> different. And the definition of the child classes should be:
>>>>
>>>> class MergingState(AppendingState[IN, OUT]):
>>>>   pass
>>>>
>>>>
>>>> class ListState(MergingState[T, Iterable[T]]):
>>>>
>>>>  @abstractmethod
>>>>  def update(self, values: List[T]) -> None:
>>>>      pass
>>>>
>>>>  @abstractmethod
>>>>  def add_all(self, values: List[T]) -> None:
>>>>      pass
>>>>
>>>>  def __iter__(self) -> Iterator[T]:
>>>>      return iter(self.get())
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>>> 在 2020年12月17日,21:06,Shuiqiang Chen <[hidden email]> 写道:
>>>>>
>>>>> Hi Yun,
>>>>>
>>>>> Highly appreciate for your questions! I have the corresponding answers
>>> as bellow:
>>>>>
>>>>> Re 1: You are right that the state access occurs in an async thread.
>>> However, all the state access will be synchrouzed in the Java operator
>> and
>>> so there will be no concurrent access to the state backend.
>>>>>
>>>>> Re 2: I think it could be handled well in Python DataStream API. In
>>> this case, there will be two operators and so also two keyed state
>> backend.
>>>>>
>>>>> Re 3: Sure, you are right. We will store the current key which may be
>>> used by the timer.
>>>>>
>>>>> Re 4: Good point. State migration is still not covered in the current
>>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
>> to
>>> this FLIP. I have updated the FLIP and added clear description for this.
>>>>>
>>>>> Re 5: Good point. We may need to introduce a Python querable state
>>> client if we want to support Queryable state for Python operators. I'd
>> like
>>> to cover it in a separate FLIP. I have updated the FLIP and add it as a
>>> future work.
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>>> 在 2020年12月17日,下午12:08,Yun Tang <[hidden email]> 写道:
>>>>>>
>>>>>> Hi Shuiqiang,
>>>>>>
>>>>>> Thanks for driving this. I have several questions below:
>>>>>>
>>>>>>
>>>>>> 1.  Thread safety of state write-access. As you might know, state
>>> access is not thread-safe [1] in Flink, we depend on task single thread
>>> access. Since you change the state access to another async thread, can we
>>> still ensure this? It also includes not allow user to access state in its
>>> java operator along with the bundled python operator.
>>>>>> 2.  Number of keyed state backend per task. Flink would only have one
>>> keyed state-backend per operator and would only have one keyed state
>>> backend per operator chain (in the head operator if possible). However,
>>> once we use experimental features such as reinterpretAsKeyedStream [2],
>> we
>>> could have two keyed state-backend in one operator chain within one task.
>>> Can python datastream API could handle this well?
>>>>>> 3.  Time to set current key. As we still need current key when
>>> registering timer [3], we need some place to hole the current key even
>> not
>>> registered in keyed state backend.
>>>>>> 4.  State migration. Flink supports to migrate state automatically if
>>> new provided serializer is compatible with old serializer[4]. I'm afraid
>> if
>>> python data stream API wraps user's serializer as
>>> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
>>> RocksDB will migrate state automatically on java side [5] and this will
>>> break if python related bytes involved.
>>>>>> 5.  Queryable state client. Currently, we only have java-based
>>> queryable state client [6], and we need another python-based queryable
>>> state client if involved python bytes.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-13072
>>>>>> [2]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>>>> [3]
>>>
>> https://github.com/apache/flink/blob/58cc2a5fbd419d6a9e4f9c251ac01ecf59a8c5a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L203
>>>>>> [4]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#evolving-state-schema
>>>>>> [5]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/custom_serialization.html#off-heap-state-backends-eg-rocksdbstatebackend
>>>>>> [6]
>>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#example
>>>>>>
>>>>>> Best
>>>>>> Yun Tang
>>>>>>
>>>>>>
>>>>>> ________________________________
>>>>>> From: Shuiqiang Chen <[hidden email]>
>>>>>> Sent: Wednesday, December 16, 2020 17:32
>>>>>> To: [hidden email] <[hidden email]>
>>>>>> Subject: Re: [DISCUSS] FLIP-153: Support state access in Python
>>> DataStream API
>>>>>>
>>>>>> Hi Xingbo,
>>>>>>
>>>>>> Thank you for your valuable suggestions.
>>>>>>
>>>>>> Indeed, we need to provide clearer abstractions for StateDescriptor
>>> and State APIs, I have updated the FLIP accordingly. Looking forward to
>>> your feedbacks!
>>>>>>
>>>>>> Best,
>>>>>> Shuiqiang
>>>>>>
>>>>>>> 在 2020年12月14日,上午11:27,Xingbo Huang <[hidden email]> 写道:
>>>>>>>
>>>>>>> Thanks Shuiqiang for starting this discussion.
>>>>>>>
>>>>>>> Big +1 for this feature. State access support can further improve
>> the
>>>>>>> functionality of our existing Python DataStream.
>>>>>>>
>>>>>>> I have 2 comments regarding to the design doc:
>>>>>>>
>>>>>>> a) I think that `StateDescriptor` needs to hold the variable
>>> `typeInfo`
>>>>>>> instead of letting each implementation class hold `typeInfo`
>>> itself.For
>>>>>>> example, `ListStateDescriptor` does not hold `elem_type_info`, but
>>> passes
>>>>>>> `ListTypeInfo(elem_type_info)` to the construct method of
>>> `StateDescriptor`.
>>>>>>>
>>>>>>> b) I think we need to add the `MergingState` and `AppendingState`
>>>>>>> interfaces, and then extract the `get` and `add` methods from
>>> `ListState`,
>>>>>>> `AggregatingState`, and `ReducingState` into `AppendingState`. Then
>>> let
>>>>>>> `ListState`, `AggregatingState` and `ReducingState` inherit
>>> `MergingState`.
>>>>>>>
>>>>>>> Best,
>>>>>>> Xingbo
>>>>>>>
>>>>>>> Shuiqiang Chen <[hidden email]> 于2020年12月11日周五 下午9:44写道:
>>>>>>>
>>>>>>>> Hi devs,
>>>>>>>>
>>>>>>>> In FLIP-130, we have already supported Python DataStream stateless
>>> APIs so
>>>>>>>> that users are able to perform some basic data transformations. To
>>>>>>>> implement more complex data processing, we need to provide state
>>> access
>>>>>>>> support. So I would propose to add state access APIs in Python
>>> DataStream
>>>>>>>> API to support stateful operations on a KeyedStream. More details
>>> are in
>>>>>>>> the FLIP wiki page [1].
>>>>>>>>
>>>>>>>> Any feedback will be highly appreciated!
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Shuiqiang
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>