Hi all,
I found that currently the managed memory can only be used in 3 workloads [1]: - state backends for streaming jobs - sorting, hash tables for batch jobs - python UDFs And the configuration option `taskmanager.memory.managed.consumer-weights` only allows values: PYTHON and DATAPROC (state in streaming or algorithms in batch). I'm confused why it doesn't allow streaming operators to use managed memory for purposes other than state backends. The background is that we are planning to use some batch algorithms (sorting & bytes hash table) to improve the performance of streaming SQL operators, especially for the mini-batch operators. Currently, the mini-batch operators are buffering input records and accumulators in heap (i.e. Java HashMap) which is not efficient and there are potential risks of full GC and OOM. With the managed memory, we can fully use the memory to buffer more data without worrying about OOM and improve the performance a lot. What do you think about allowing streaming operators to use managed memory and exposing it in configuration. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory |
I agree, we should allow streaming operators to use managed memory for
other use cases. Do you think we need an additional "consumer" setting or that they would just use `DATAPROC` and decide by themselves what to use the memory for? Best, Aljoscha On 2020/12/22 17:14, Jark Wu wrote: >Hi all, > >I found that currently the managed memory can only be used in 3 workloads >[1]: >- state backends for streaming jobs >- sorting, hash tables for batch jobs >- python UDFs > >And the configuration option `taskmanager.memory.managed.consumer-weights` >only allows values: PYTHON and DATAPROC (state in streaming or algorithms >in batch). >I'm confused why it doesn't allow streaming operators to use managed memory >for purposes other than state backends. > >The background is that we are planning to use some batch algorithms >(sorting & bytes hash table) to improve the performance of streaming SQL >operators, especially for the mini-batch operators. >Currently, the mini-batch operators are buffering input records and >accumulators in heap (i.e. Java HashMap) which is not efficient and there >are potential risks of full GC and OOM. >With the managed memory, we can fully use the memory to buffer more data >without worrying about OOM and improve the performance a lot. > >What do you think about allowing streaming operators to use managed memory >and exposing it in configuration. > >Best, >Jark > >[1]: >https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory |
Hi Aljoscha,
I think we may need to divide `DATAPROC` into `OPERATOR` and `STATE_BACKEND`, because they have different scope (slot vs. operator). But @Xintong Song <[hidden email]> may have more insights on it. Best, Jark On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> wrote: > I agree, we should allow streaming operators to use managed memory for > other use cases. > > Do you think we need an additional "consumer" setting or that they would > just use `DATAPROC` and decide by themselves what to use the memory for? > > Best, > Aljoscha > > On 2020/12/22 17:14, Jark Wu wrote: > >Hi all, > > > >I found that currently the managed memory can only be used in 3 workloads > >[1]: > >- state backends for streaming jobs > >- sorting, hash tables for batch jobs > >- python UDFs > > > >And the configuration option `taskmanager.memory.managed.consumer-weights` > >only allows values: PYTHON and DATAPROC (state in streaming or algorithms > >in batch). > >I'm confused why it doesn't allow streaming operators to use managed > memory > >for purposes other than state backends. > > > >The background is that we are planning to use some batch algorithms > >(sorting & bytes hash table) to improve the performance of streaming SQL > >operators, especially for the mini-batch operators. > >Currently, the mini-batch operators are buffering input records and > >accumulators in heap (i.e. Java HashMap) which is not efficient and there > >are potential risks of full GC and OOM. > >With the managed memory, we can fully use the memory to buffer more data > >without worrying about OOM and improve the performance a lot. > > > >What do you think about allowing streaming operators to use managed memory > >and exposing it in configuration. > > > >Best, > >Jark > > > >[1]: > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > |
+1 for allowing streaming operators to use managed memory.
As for the consumer names, I'm afraid using `DATAPROC` for both streaming ops and state backends will not work. Currently, RocksDB state backend uses a shared piece of memory for all the states within that slot. It's not the operator's decision how much memory it uses for the states. I would suggest the following. (IIUC, the same as what Jark proposed) * `OPERATOR` for both streaming and bath operators * `STATE_BACKEND` for state backends * `PYTHON` for python processes * `DATAPROC` as a legacy key for state backend or batch operators if `STATE_BACKEND` or `OPERATOR` are not specified. Thank you~ Xintong Song On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > Hi Aljoscha, > > I think we may need to divide `DATAPROC` into `OPERATOR` and > `STATE_BACKEND`, because they have different scope (slot vs. operator). > But @Xintong Song <[hidden email]> may have more insights on it. > > Best, > Jark > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> wrote: > >> I agree, we should allow streaming operators to use managed memory for >> other use cases. >> >> Do you think we need an additional "consumer" setting or that they would >> just use `DATAPROC` and decide by themselves what to use the memory for? >> >> Best, >> Aljoscha >> >> On 2020/12/22 17:14, Jark Wu wrote: >> >Hi all, >> > >> >I found that currently the managed memory can only be used in 3 workloads >> >[1]: >> >- state backends for streaming jobs >> >- sorting, hash tables for batch jobs >> >- python UDFs >> > >> >And the configuration option >> `taskmanager.memory.managed.consumer-weights` >> >only allows values: PYTHON and DATAPROC (state in streaming or algorithms >> >in batch). >> >I'm confused why it doesn't allow streaming operators to use managed >> memory >> >for purposes other than state backends. >> > >> >The background is that we are planning to use some batch algorithms >> >(sorting & bytes hash table) to improve the performance of streaming SQL >> >operators, especially for the mini-batch operators. >> >Currently, the mini-batch operators are buffering input records and >> >accumulators in heap (i.e. Java HashMap) which is not efficient and there >> >are potential risks of full GC and OOM. >> >With the managed memory, we can fully use the memory to buffer more data >> >without worrying about OOM and improve the performance a lot. >> > >> >What do you think about allowing streaming operators to use managed >> memory >> >and exposing it in configuration. >> > >> >Best, >> >Jark >> > >> >[1]: >> > >> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory >> > |
+1 to Xingtong's proposal!
Best, Jark On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote: > +1 for allowing streaming operators to use managed memory. > > As for the consumer names, I'm afraid using `DATAPROC` for both streaming > ops and state backends will not work. Currently, RocksDB state backend uses > a shared piece of memory for all the states within that slot. It's not the > operator's decision how much memory it uses for the states. > > I would suggest the following. (IIUC, the same as what Jark proposed) > * `OPERATOR` for both streaming and bath operators > * `STATE_BACKEND` for state backends > * `PYTHON` for python processes > * `DATAPROC` as a legacy key for state backend or batch operators if > `STATE_BACKEND` or `OPERATOR` are not specified. > > Thank you~ > > Xintong Song > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > Hi Aljoscha, > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > `STATE_BACKEND`, because they have different scope (slot vs. operator). > > But @Xintong Song <[hidden email]> may have more insights on it. > > > > Best, > > Jark > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> > wrote: > > > >> I agree, we should allow streaming operators to use managed memory for > >> other use cases. > >> > >> Do you think we need an additional "consumer" setting or that they would > >> just use `DATAPROC` and decide by themselves what to use the memory for? > >> > >> Best, > >> Aljoscha > >> > >> On 2020/12/22 17:14, Jark Wu wrote: > >> >Hi all, > >> > > >> >I found that currently the managed memory can only be used in 3 > workloads > >> >[1]: > >> >- state backends for streaming jobs > >> >- sorting, hash tables for batch jobs > >> >- python UDFs > >> > > >> >And the configuration option > >> `taskmanager.memory.managed.consumer-weights` > >> >only allows values: PYTHON and DATAPROC (state in streaming or > algorithms > >> >in batch). > >> >I'm confused why it doesn't allow streaming operators to use managed > >> memory > >> >for purposes other than state backends. > >> > > >> >The background is that we are planning to use some batch algorithms > >> >(sorting & bytes hash table) to improve the performance of streaming > SQL > >> >operators, especially for the mini-batch operators. > >> >Currently, the mini-batch operators are buffering input records and > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and > there > >> >are potential risks of full GC and OOM. > >> >With the managed memory, we can fully use the memory to buffer more > data > >> >without worrying about OOM and improve the performance a lot. > >> > > >> >What do you think about allowing streaming operators to use managed > >> memory > >> >and exposing it in configuration. > >> > > >> >Best, > >> >Jark > >> > > >> >[1]: > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > >> > > > |
+1 for allowing streaming operators to use managed memory.
The memory use of streams requires some hierarchy, and the bottom layer is undoubtedly the current StateBackend. Let the stream operators freely use the managed memory, which will make the memory management model to be unified and give the operator free space. Xingtong's proposal looks good to me. +1 to split `DATAPROC` into `STATE_BACKEND` or `OPERATOR`. Best, Jingsong On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > +1 to Xingtong's proposal! > > Best, > Jark > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote: > > > +1 for allowing streaming operators to use managed memory. > > > > As for the consumer names, I'm afraid using `DATAPROC` for both streaming > > ops and state backends will not work. Currently, RocksDB state backend > uses > > a shared piece of memory for all the states within that slot. It's not > the > > operator's decision how much memory it uses for the states. > > > > I would suggest the following. (IIUC, the same as what Jark proposed) > > * `OPERATOR` for both streaming and bath operators > > * `STATE_BACKEND` for state backends > > * `PYTHON` for python processes > > * `DATAPROC` as a legacy key for state backend or batch operators if > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > > > Hi Aljoscha, > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > `STATE_BACKEND`, because they have different scope (slot vs. operator). > > > But @Xintong Song <[hidden email]> may have more insights on > it. > > > > > > Best, > > > Jark > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > >> I agree, we should allow streaming operators to use managed memory for > > >> other use cases. > > >> > > >> Do you think we need an additional "consumer" setting or that they > would > > >> just use `DATAPROC` and decide by themselves what to use the memory > for? > > >> > > >> Best, > > >> Aljoscha > > >> > > >> On 2020/12/22 17:14, Jark Wu wrote: > > >> >Hi all, > > >> > > > >> >I found that currently the managed memory can only be used in 3 > > workloads > > >> >[1]: > > >> >- state backends for streaming jobs > > >> >- sorting, hash tables for batch jobs > > >> >- python UDFs > > >> > > > >> >And the configuration option > > >> `taskmanager.memory.managed.consumer-weights` > > >> >only allows values: PYTHON and DATAPROC (state in streaming or > > algorithms > > >> >in batch). > > >> >I'm confused why it doesn't allow streaming operators to use managed > > >> memory > > >> >for purposes other than state backends. > > >> > > > >> >The background is that we are planning to use some batch algorithms > > >> >(sorting & bytes hash table) to improve the performance of streaming > > SQL > > >> >operators, especially for the mini-batch operators. > > >> >Currently, the mini-batch operators are buffering input records and > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and > > there > > >> >are potential risks of full GC and OOM. > > >> >With the managed memory, we can fully use the memory to buffer more > > data > > >> >without worrying about OOM and improve the performance a lot. > > >> > > > >> >What do you think about allowing streaming operators to use managed > > >> memory > > >> >and exposing it in configuration. > > >> > > > >> >Best, > > >> >Jark > > >> > > > >> >[1]: > > >> > > > >> > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > >> > > > > > > -- Best, Jingsong Lee |
+1 for Jark's and Xintong's proposal.
Would the default weight for OPERATOR and STATE_BACKEND be the same value? Cheers, Till On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> wrote: > +1 for allowing streaming operators to use managed memory. > > The memory use of streams requires some hierarchy, and the bottom layer is > undoubtedly the current StateBackend. > Let the stream operators freely use the managed memory, which will make the > memory management model to be unified and give the operator free space. > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into > `STATE_BACKEND` or `OPERATOR`. > > Best, > Jingsong > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > > > +1 to Xingtong's proposal! > > > > Best, > > Jark > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote: > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both > streaming > > > ops and state backends will not work. Currently, RocksDB state backend > > uses > > > a shared piece of memory for all the states within that slot. It's not > > the > > > operator's decision how much memory it uses for the states. > > > > > > I would suggest the following. (IIUC, the same as what Jark proposed) > > > * `OPERATOR` for both streaming and bath operators > > > * `STATE_BACKEND` for state backends > > > * `PYTHON` for python processes > > > * `DATAPROC` as a legacy key for state backend or batch operators if > > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > > > > > Hi Aljoscha, > > > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > > `STATE_BACKEND`, because they have different scope (slot vs. > operator). > > > > But @Xintong Song <[hidden email]> may have more insights on > > it. > > > > > > > > Best, > > > > Jark > > > > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > > > >> I agree, we should allow streaming operators to use managed memory > for > > > >> other use cases. > > > >> > > > >> Do you think we need an additional "consumer" setting or that they > > would > > > >> just use `DATAPROC` and decide by themselves what to use the memory > > for? > > > >> > > > >> Best, > > > >> Aljoscha > > > >> > > > >> On 2020/12/22 17:14, Jark Wu wrote: > > > >> >Hi all, > > > >> > > > > >> >I found that currently the managed memory can only be used in 3 > > > workloads > > > >> >[1]: > > > >> >- state backends for streaming jobs > > > >> >- sorting, hash tables for batch jobs > > > >> >- python UDFs > > > >> > > > > >> >And the configuration option > > > >> `taskmanager.memory.managed.consumer-weights` > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or > > > algorithms > > > >> >in batch). > > > >> >I'm confused why it doesn't allow streaming operators to use > managed > > > >> memory > > > >> >for purposes other than state backends. > > > >> > > > > >> >The background is that we are planning to use some batch algorithms > > > >> >(sorting & bytes hash table) to improve the performance of > streaming > > > SQL > > > >> >operators, especially for the mini-batch operators. > > > >> >Currently, the mini-batch operators are buffering input records and > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and > > > there > > > >> >are potential risks of full GC and OOM. > > > >> >With the managed memory, we can fully use the memory to buffer more > > > data > > > >> >without worrying about OOM and improve the performance a lot. > > > >> > > > > >> >What do you think about allowing streaming operators to use managed > > > >> memory > > > >> >and exposing it in configuration. > > > >> > > > > >> >Best, > > > >> >Jark > > > >> > > > > >> >[1]: > > > >> > > > > >> > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > > >> > > > > > > > > > > > > -- > Best, Jingsong Lee > |
>
> Would the default weight for OPERATOR and STATE_BACKEND be the same value? > I would say yes, to align with previous behaviors. Thank you~ Xintong Song On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote: > +1 for Jark's and Xintong's proposal. > > Would the default weight for OPERATOR and STATE_BACKEND be the same value? > > Cheers, > Till > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> wrote: > > > +1 for allowing streaming operators to use managed memory. > > > > The memory use of streams requires some hierarchy, and the bottom layer > is > > undoubtedly the current StateBackend. > > Let the stream operators freely use the managed memory, which will make > the > > memory management model to be unified and give the operator free space. > > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into > > `STATE_BACKEND` or `OPERATOR`. > > > > Best, > > Jingsong > > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > > > > > +1 to Xingtong's proposal! > > > > > > Best, > > > Jark > > > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> > wrote: > > > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both > > streaming > > > > ops and state backends will not work. Currently, RocksDB state > backend > > > uses > > > > a shared piece of memory for all the states within that slot. It's > not > > > the > > > > operator's decision how much memory it uses for the states. > > > > > > > > I would suggest the following. (IIUC, the same as what Jark proposed) > > > > * `OPERATOR` for both streaming and bath operators > > > > * `STATE_BACKEND` for state backends > > > > * `PYTHON` for python processes > > > > * `DATAPROC` as a legacy key for state backend or batch operators if > > > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > > > > > > > Hi Aljoscha, > > > > > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > > > `STATE_BACKEND`, because they have different scope (slot vs. > > operator). > > > > > But @Xintong Song <[hidden email]> may have more insights > on > > > it. > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email] > > > > > > wrote: > > > > > > > > > >> I agree, we should allow streaming operators to use managed memory > > for > > > > >> other use cases. > > > > >> > > > > >> Do you think we need an additional "consumer" setting or that they > > > would > > > > >> just use `DATAPROC` and decide by themselves what to use the > memory > > > for? > > > > >> > > > > >> Best, > > > > >> Aljoscha > > > > >> > > > > >> On 2020/12/22 17:14, Jark Wu wrote: > > > > >> >Hi all, > > > > >> > > > > > >> >I found that currently the managed memory can only be used in 3 > > > > workloads > > > > >> >[1]: > > > > >> >- state backends for streaming jobs > > > > >> >- sorting, hash tables for batch jobs > > > > >> >- python UDFs > > > > >> > > > > > >> >And the configuration option > > > > >> `taskmanager.memory.managed.consumer-weights` > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or > > > > algorithms > > > > >> >in batch). > > > > >> >I'm confused why it doesn't allow streaming operators to use > > managed > > > > >> memory > > > > >> >for purposes other than state backends. > > > > >> > > > > > >> >The background is that we are planning to use some batch > algorithms > > > > >> >(sorting & bytes hash table) to improve the performance of > > streaming > > > > SQL > > > > >> >operators, especially for the mini-batch operators. > > > > >> >Currently, the mini-batch operators are buffering input records > and > > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient > and > > > > there > > > > >> >are potential risks of full GC and OOM. > > > > >> >With the managed memory, we can fully use the memory to buffer > more > > > > data > > > > >> >without worrying about OOM and improve the performance a lot. > > > > >> > > > > > >> >What do you think about allowing streaming operators to use > managed > > > > >> memory > > > > >> >and exposing it in configuration. > > > > >> > > > > > >> >Best, > > > > >> >Jark > > > > >> > > > > > >> >[1]: > > > > >> > > > > > >> > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > > > >> > > > > > > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > |
Thanks all for the discussion.
I have created an issue FLINK-20860 [1] to support this. In conclusion, we will extend the configuration `taskmanager.memory.managed.consumer-weights` to have 2 more consumer kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be : * `OPERATOR` for both streaming and bath operators * `STATE_BACKEND` for state backends * `PYTHON` for python processes * `DATAPROC` as a legacy key for state backend or batch operators if `STATE_BACKEND` or `OPERATOR` are not specified. The previous default value is DATAPROC:70,PYTHON:30, the new default value will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30. The weight for OPERATOR and STATE_BACKEND will be the same value to align with previous behaviors. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-20860 On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote: > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > value? > > > > I would say yes, to align with previous behaviors. > > > Thank you~ > > Xintong Song > > > > On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote: > > > +1 for Jark's and Xintong's proposal. > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > value? > > > > Cheers, > > Till > > > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> > wrote: > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > The memory use of streams requires some hierarchy, and the bottom layer > > is > > > undoubtedly the current StateBackend. > > > Let the stream operators freely use the managed memory, which will make > > the > > > memory management model to be unified and give the operator free space. > > > > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into > > > `STATE_BACKEND` or `OPERATOR`. > > > > > > Best, > > > Jingsong > > > > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > > > > > > > +1 to Xingtong's proposal! > > > > > > > > Best, > > > > Jark > > > > > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> > > wrote: > > > > > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both > > > streaming > > > > > ops and state backends will not work. Currently, RocksDB state > > backend > > > > uses > > > > > a shared piece of memory for all the states within that slot. It's > > not > > > > the > > > > > operator's decision how much memory it uses for the states. > > > > > > > > > > I would suggest the following. (IIUC, the same as what Jark > proposed) > > > > > * `OPERATOR` for both streaming and bath operators > > > > > * `STATE_BACKEND` for state backends > > > > > * `PYTHON` for python processes > > > > > * `DATAPROC` as a legacy key for state backend or batch operators > if > > > > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > > > > `STATE_BACKEND`, because they have different scope (slot vs. > > > operator). > > > > > > But @Xintong Song <[hidden email]> may have more insights > > on > > > > it. > > > > > > > > > > > > Best, > > > > > > Jark > > > > > > > > > > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek < > [hidden email] > > > > > > > > wrote: > > > > > > > > > > > >> I agree, we should allow streaming operators to use managed > memory > > > for > > > > > >> other use cases. > > > > > >> > > > > > >> Do you think we need an additional "consumer" setting or that > they > > > > would > > > > > >> just use `DATAPROC` and decide by themselves what to use the > > memory > > > > for? > > > > > >> > > > > > >> Best, > > > > > >> Aljoscha > > > > > >> > > > > > >> On 2020/12/22 17:14, Jark Wu wrote: > > > > > >> >Hi all, > > > > > >> > > > > > > >> >I found that currently the managed memory can only be used in 3 > > > > > workloads > > > > > >> >[1]: > > > > > >> >- state backends for streaming jobs > > > > > >> >- sorting, hash tables for batch jobs > > > > > >> >- python UDFs > > > > > >> > > > > > > >> >And the configuration option > > > > > >> `taskmanager.memory.managed.consumer-weights` > > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or > > > > > algorithms > > > > > >> >in batch). > > > > > >> >I'm confused why it doesn't allow streaming operators to use > > > managed > > > > > >> memory > > > > > >> >for purposes other than state backends. > > > > > >> > > > > > > >> >The background is that we are planning to use some batch > > algorithms > > > > > >> >(sorting & bytes hash table) to improve the performance of > > > streaming > > > > > SQL > > > > > >> >operators, especially for the mini-batch operators. > > > > > >> >Currently, the mini-batch operators are buffering input records > > and > > > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient > > and > > > > > there > > > > > >> >are potential risks of full GC and OOM. > > > > > >> >With the managed memory, we can fully use the memory to buffer > > more > > > > > data > > > > > >> >without worrying about OOM and improve the performance a lot. > > > > > >> > > > > > > >> >What do you think about allowing streaming operators to use > > managed > > > > > >> memory > > > > > >> >and exposing it in configuration. > > > > > >> > > > > > > >> >Best, > > > > > >> >Jark > > > > > >> > > > > > > >> >[1]: > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > |
I think using managed memory within streaming operator is a good idea and I just have a question over last conclusion:
If both OPERATOR and STATE_BACKEND set as 70 to align with previous behavior, what will happen if one slot has both consumers of managed streaming operator and state backend? As you can see previous DATAPROC + PYTHON = 100, which describes the situation when one slot has both consumers of managed python and state backend. Best Yun Tang ________________________________ From: Jark Wu <[hidden email]> Sent: Wednesday, January 6, 2021 13:51 To: dev <[hidden email]> Subject: Re: [DISCUSS] Allow streaming operators to use managed memory Thanks all for the discussion. I have created an issue FLINK-20860 [1] to support this. In conclusion, we will extend the configuration `taskmanager.memory.managed.consumer-weights` to have 2 more consumer kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be : * `OPERATOR` for both streaming and bath operators * `STATE_BACKEND` for state backends * `PYTHON` for python processes * `DATAPROC` as a legacy key for state backend or batch operators if `STATE_BACKEND` or `OPERATOR` are not specified. The previous default value is DATAPROC:70,PYTHON:30, the new default value will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30. The weight for OPERATOR and STATE_BACKEND will be the same value to align with previous behaviors. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-20860 On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote: > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > value? > > > > I would say yes, to align with previous behaviors. > > > Thank you~ > > Xintong Song > > > > On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote: > > > +1 for Jark's and Xintong's proposal. > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > value? > > > > Cheers, > > Till > > > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> > wrote: > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > The memory use of streams requires some hierarchy, and the bottom layer > > is > > > undoubtedly the current StateBackend. > > > Let the stream operators freely use the managed memory, which will make > > the > > > memory management model to be unified and give the operator free space. > > > > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into > > > `STATE_BACKEND` or `OPERATOR`. > > > > > > Best, > > > Jingsong > > > > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > > > > > > > +1 to Xingtong's proposal! > > > > > > > > Best, > > > > Jark > > > > > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> > > wrote: > > > > > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both > > > streaming > > > > > ops and state backends will not work. Currently, RocksDB state > > backend > > > > uses > > > > > a shared piece of memory for all the states within that slot. It's > > not > > > > the > > > > > operator's decision how much memory it uses for the states. > > > > > > > > > > I would suggest the following. (IIUC, the same as what Jark > proposed) > > > > > * `OPERATOR` for both streaming and bath operators > > > > > * `STATE_BACKEND` for state backends > > > > > * `PYTHON` for python processes > > > > > * `DATAPROC` as a legacy key for state backend or batch operators > if > > > > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote: > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > > > > `STATE_BACKEND`, because they have different scope (slot vs. > > > operator). > > > > > > But @Xintong Song <[hidden email]> may have more insights > > on > > > > it. > > > > > > > > > > > > Best, > > > > > > Jark > > > > > > > > > > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek < > [hidden email] > > > > > > > > wrote: > > > > > > > > > > > >> I agree, we should allow streaming operators to use managed > memory > > > for > > > > > >> other use cases. > > > > > >> > > > > > >> Do you think we need an additional "consumer" setting or that > they > > > > would > > > > > >> just use `DATAPROC` and decide by themselves what to use the > > memory > > > > for? > > > > > >> > > > > > >> Best, > > > > > >> Aljoscha > > > > > >> > > > > > >> On 2020/12/22 17:14, Jark Wu wrote: > > > > > >> >Hi all, > > > > > >> > > > > > > >> >I found that currently the managed memory can only be used in 3 > > > > > workloads > > > > > >> >[1]: > > > > > >> >- state backends for streaming jobs > > > > > >> >- sorting, hash tables for batch jobs > > > > > >> >- python UDFs > > > > > >> > > > > > > >> >And the configuration option > > > > > >> `taskmanager.memory.managed.consumer-weights` > > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or > > > > > algorithms > > > > > >> >in batch). > > > > > >> >I'm confused why it doesn't allow streaming operators to use > > > managed > > > > > >> memory > > > > > >> >for purposes other than state backends. > > > > > >> > > > > > > >> >The background is that we are planning to use some batch > > algorithms > > > > > >> >(sorting & bytes hash table) to improve the performance of > > > streaming > > > > > SQL > > > > > >> >operators, especially for the mini-batch operators. > > > > > >> >Currently, the mini-batch operators are buffering input records > > and > > > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient > > and > > > > > there > > > > > >> >are potential risks of full GC and OOM. > > > > > >> >With the managed memory, we can fully use the memory to buffer > > more > > > > > data > > > > > >> >without worrying about OOM and improve the performance a lot. > > > > > >> > > > > > > >> >What do you think about allowing streaming operators to use > > managed > > > > > >> memory > > > > > >> >and exposing it in configuration. > > > > > >> > > > > > > >> >Best, > > > > > >> >Jark > > > > > >> > > > > > > >> >[1]: > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > |
Thanks for driving the discussion, @Jark. The conclusion LGTM.
@Yun, Since the streaming operators did not use managed memory previously, I don't think it's possible for any use cases with managed memory streaming operators to align with the previous behaviors. No matter how the consumer weights are configured, some of the managed memory has to be transferred from the previous consumers to the new managed memory streaming operators. Thank you~ Xintong Song On Wed, Jan 6, 2021 at 2:38 PM Yun Tang <[hidden email]> wrote: > I think using managed memory within streaming operator is a good idea and > I just have a question over last conclusion: > > If both OPERATOR and STATE_BACKEND set as 70 to align with previous > behavior, what will happen if one slot has both consumers of managed > streaming operator and state backend? > > As you can see previous DATAPROC + PYTHON = 100, which describes the > situation when one slot has both consumers of managed python and state > backend. > > Best > Yun Tang > ________________________________ > From: Jark Wu <[hidden email]> > Sent: Wednesday, January 6, 2021 13:51 > To: dev <[hidden email]> > Subject: Re: [DISCUSS] Allow streaming operators to use managed memory > > Thanks all for the discussion. > > I have created an issue FLINK-20860 [1] to support this. > > In conclusion, we will extend the configuration > `taskmanager.memory.managed.consumer-weights` to have 2 more consumer > kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be : > > * `OPERATOR` for both streaming and bath operators > * `STATE_BACKEND` for state backends > * `PYTHON` for python processes > * `DATAPROC` as a legacy key for state backend or batch operators if > `STATE_BACKEND` or `OPERATOR` are not specified. > > The previous default value is DATAPROC:70,PYTHON:30, the new default value > will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30. > > The weight for OPERATOR and STATE_BACKEND will be the same value to align > with previous behaviors. > > Best, > Jark > > [1]: https://issues.apache.org/jira/browse/FLINK-20860 > > On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote: > > > > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > > value? > > > > > > > I would say yes, to align with previous behaviors. > > > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> > wrote: > > > > > +1 for Jark's and Xintong's proposal. > > > > > > Would the default weight for OPERATOR and STATE_BACKEND be the same > > value? > > > > > > Cheers, > > > Till > > > > > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> > > wrote: > > > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > > > The memory use of streams requires some hierarchy, and the bottom > layer > > > is > > > > undoubtedly the current StateBackend. > > > > Let the stream operators freely use the managed memory, which will > make > > > the > > > > memory management model to be unified and give the operator free > space. > > > > > > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into > > > > `STATE_BACKEND` or `OPERATOR`. > > > > > > > > Best, > > > > Jingsong > > > > > > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote: > > > > > > > > > +1 to Xingtong's proposal! > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> > > > wrote: > > > > > > > > > > > +1 for allowing streaming operators to use managed memory. > > > > > > > > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both > > > > streaming > > > > > > ops and state backends will not work. Currently, RocksDB state > > > backend > > > > > uses > > > > > > a shared piece of memory for all the states within that slot. > It's > > > not > > > > > the > > > > > > operator's decision how much memory it uses for the states. > > > > > > > > > > > > I would suggest the following. (IIUC, the same as what Jark > > proposed) > > > > > > * `OPERATOR` for both streaming and bath operators > > > > > > * `STATE_BACKEND` for state backends > > > > > > * `PYTHON` for python processes > > > > > > * `DATAPROC` as a legacy key for state backend or batch operators > > if > > > > > > `STATE_BACKEND` or `OPERATOR` are not specified. > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> > wrote: > > > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and > > > > > > > `STATE_BACKEND`, because they have different scope (slot vs. > > > > operator). > > > > > > > But @Xintong Song <[hidden email]> may have more > insights > > > on > > > > > it. > > > > > > > > > > > > > > Best, > > > > > > > Jark > > > > > > > > > > > > > > > > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek < > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > > > >> I agree, we should allow streaming operators to use managed > > memory > > > > for > > > > > > >> other use cases. > > > > > > >> > > > > > > >> Do you think we need an additional "consumer" setting or that > > they > > > > > would > > > > > > >> just use `DATAPROC` and decide by themselves what to use the > > > memory > > > > > for? > > > > > > >> > > > > > > >> Best, > > > > > > >> Aljoscha > > > > > > >> > > > > > > >> On 2020/12/22 17:14, Jark Wu wrote: > > > > > > >> >Hi all, > > > > > > >> > > > > > > > >> >I found that currently the managed memory can only be used > in 3 > > > > > > workloads > > > > > > >> >[1]: > > > > > > >> >- state backends for streaming jobs > > > > > > >> >- sorting, hash tables for batch jobs > > > > > > >> >- python UDFs > > > > > > >> > > > > > > > >> >And the configuration option > > > > > > >> `taskmanager.memory.managed.consumer-weights` > > > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming > or > > > > > > algorithms > > > > > > >> >in batch). > > > > > > >> >I'm confused why it doesn't allow streaming operators to use > > > > managed > > > > > > >> memory > > > > > > >> >for purposes other than state backends. > > > > > > >> > > > > > > > >> >The background is that we are planning to use some batch > > > algorithms > > > > > > >> >(sorting & bytes hash table) to improve the performance of > > > > streaming > > > > > > SQL > > > > > > >> >operators, especially for the mini-batch operators. > > > > > > >> >Currently, the mini-batch operators are buffering input > records > > > and > > > > > > >> >accumulators in heap (i.e. Java HashMap) which is not > efficient > > > and > > > > > > there > > > > > > >> >are potential risks of full GC and OOM. > > > > > > >> >With the managed memory, we can fully use the memory to > buffer > > > more > > > > > > data > > > > > > >> >without worrying about OOM and improve the performance a lot. > > > > > > >> > > > > > > > >> >What do you think about allowing streaming operators to use > > > managed > > > > > > >> memory > > > > > > >> >and exposing it in configuration. > > > > > > >> > > > > > > > >> >Best, > > > > > > >> >Jark > > > > > > >> > > > > > > > >> >[1]: > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > |
Free forum by Nabble | Edit this page |