[DISCUSS] Allow streaming operators to use managed memory

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Allow streaming operators to use managed memory

Jark Wu-2
Hi all,

I found that currently the managed memory can only be used in 3 workloads
[1]:
- state backends for streaming jobs
- sorting, hash tables for batch jobs
- python UDFs

And the configuration option `taskmanager.memory.managed.consumer-weights`
only allows values: PYTHON and DATAPROC (state in streaming or algorithms
in batch).
I'm confused why it doesn't allow streaming operators to use managed memory
for purposes other than state backends.

The background is that we are planning to use some batch algorithms
(sorting & bytes hash table) to improve the performance of streaming SQL
operators, especially for the mini-batch operators.
Currently, the mini-batch operators are buffering input records and
accumulators in heap (i.e. Java HashMap) which is not efficient and there
are potential risks of full GC and OOM.
With the managed memory, we can fully use the memory to buffer more data
without worrying about OOM and improve the performance a lot.

What do you think about allowing streaming operators to use managed memory
and exposing it in configuration.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Aljoscha Krettek-2
I agree, we should allow streaming operators to use managed memory for
other use cases.

Do you think we need an additional "consumer" setting or that they would
just use `DATAPROC` and decide by themselves what to use the memory for?

Best,
Aljoscha

On 2020/12/22 17:14, Jark Wu wrote:

>Hi all,
>
>I found that currently the managed memory can only be used in 3 workloads
>[1]:
>- state backends for streaming jobs
>- sorting, hash tables for batch jobs
>- python UDFs
>
>And the configuration option `taskmanager.memory.managed.consumer-weights`
>only allows values: PYTHON and DATAPROC (state in streaming or algorithms
>in batch).
>I'm confused why it doesn't allow streaming operators to use managed memory
>for purposes other than state backends.
>
>The background is that we are planning to use some batch algorithms
>(sorting & bytes hash table) to improve the performance of streaming SQL
>operators, especially for the mini-batch operators.
>Currently, the mini-batch operators are buffering input records and
>accumulators in heap (i.e. Java HashMap) which is not efficient and there
>are potential risks of full GC and OOM.
>With the managed memory, we can fully use the memory to buffer more data
>without worrying about OOM and improve the performance a lot.
>
>What do you think about allowing streaming operators to use managed memory
>and exposing it in configuration.
>
>Best,
>Jark
>
>[1]:
>https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Jark Wu-2
Hi Aljoscha,

I think we may need to divide `DATAPROC` into `OPERATOR` and
`STATE_BACKEND`, because they have different scope (slot vs. operator).
But @Xintong Song <[hidden email]> may have more insights on it.

Best,
Jark


On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> wrote:

> I agree, we should allow streaming operators to use managed memory for
> other use cases.
>
> Do you think we need an additional "consumer" setting or that they would
> just use `DATAPROC` and decide by themselves what to use the memory for?
>
> Best,
> Aljoscha
>
> On 2020/12/22 17:14, Jark Wu wrote:
> >Hi all,
> >
> >I found that currently the managed memory can only be used in 3 workloads
> >[1]:
> >- state backends for streaming jobs
> >- sorting, hash tables for batch jobs
> >- python UDFs
> >
> >And the configuration option `taskmanager.memory.managed.consumer-weights`
> >only allows values: PYTHON and DATAPROC (state in streaming or algorithms
> >in batch).
> >I'm confused why it doesn't allow streaming operators to use managed
> memory
> >for purposes other than state backends.
> >
> >The background is that we are planning to use some batch algorithms
> >(sorting & bytes hash table) to improve the performance of streaming SQL
> >operators, especially for the mini-batch operators.
> >Currently, the mini-batch operators are buffering input records and
> >accumulators in heap (i.e. Java HashMap) which is not efficient and there
> >are potential risks of full GC and OOM.
> >With the managed memory, we can fully use the memory to buffer more data
> >without worrying about OOM and improve the performance a lot.
> >
> >What do you think about allowing streaming operators to use managed memory
> >and exposing it in configuration.
> >
> >Best,
> >Jark
> >
> >[1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Xintong Song
+1 for allowing streaming operators to use managed memory.

As for the consumer names, I'm afraid using `DATAPROC` for both streaming
ops and state backends will not work. Currently, RocksDB state backend uses
a shared piece of memory for all the states within that slot. It's not the
operator's decision how much memory it uses for the states.

I would suggest the following. (IIUC, the same as what Jark proposed)
* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

Thank you~

Xintong Song



On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:

> Hi Aljoscha,
>
> I think we may need to divide `DATAPROC` into `OPERATOR` and
> `STATE_BACKEND`, because they have different scope (slot vs. operator).
> But @Xintong Song <[hidden email]> may have more insights on it.
>
> Best,
> Jark
>
>
> On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]> wrote:
>
>> I agree, we should allow streaming operators to use managed memory for
>> other use cases.
>>
>> Do you think we need an additional "consumer" setting or that they would
>> just use `DATAPROC` and decide by themselves what to use the memory for?
>>
>> Best,
>> Aljoscha
>>
>> On 2020/12/22 17:14, Jark Wu wrote:
>> >Hi all,
>> >
>> >I found that currently the managed memory can only be used in 3 workloads
>> >[1]:
>> >- state backends for streaming jobs
>> >- sorting, hash tables for batch jobs
>> >- python UDFs
>> >
>> >And the configuration option
>> `taskmanager.memory.managed.consumer-weights`
>> >only allows values: PYTHON and DATAPROC (state in streaming or algorithms
>> >in batch).
>> >I'm confused why it doesn't allow streaming operators to use managed
>> memory
>> >for purposes other than state backends.
>> >
>> >The background is that we are planning to use some batch algorithms
>> >(sorting & bytes hash table) to improve the performance of streaming SQL
>> >operators, especially for the mini-batch operators.
>> >Currently, the mini-batch operators are buffering input records and
>> >accumulators in heap (i.e. Java HashMap) which is not efficient and there
>> >are potential risks of full GC and OOM.
>> >With the managed memory, we can fully use the memory to buffer more data
>> >without worrying about OOM and improve the performance a lot.
>> >
>> >What do you think about allowing streaming operators to use managed
>> memory
>> >and exposing it in configuration.
>> >
>> >Best,
>> >Jark
>> >
>> >[1]:
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Jark Wu-2
+1 to Xingtong's proposal!

Best,
Jark

On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote:

> +1 for allowing streaming operators to use managed memory.
>
> As for the consumer names, I'm afraid using `DATAPROC` for both streaming
> ops and state backends will not work. Currently, RocksDB state backend uses
> a shared piece of memory for all the states within that slot. It's not the
> operator's decision how much memory it uses for the states.
>
> I would suggest the following. (IIUC, the same as what Jark proposed)
> * `OPERATOR` for both streaming and bath operators
> * `STATE_BACKEND` for state backends
> * `PYTHON` for python processes
> * `DATAPROC` as a legacy key for state backend or batch operators if
> `STATE_BACKEND` or `OPERATOR` are not specified.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > `STATE_BACKEND`, because they have different scope (slot vs. operator).
> > But @Xintong Song <[hidden email]> may have more insights on it.
> >
> > Best,
> > Jark
> >
> >
> > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]>
> wrote:
> >
> >> I agree, we should allow streaming operators to use managed memory for
> >> other use cases.
> >>
> >> Do you think we need an additional "consumer" setting or that they would
> >> just use `DATAPROC` and decide by themselves what to use the memory for?
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 2020/12/22 17:14, Jark Wu wrote:
> >> >Hi all,
> >> >
> >> >I found that currently the managed memory can only be used in 3
> workloads
> >> >[1]:
> >> >- state backends for streaming jobs
> >> >- sorting, hash tables for batch jobs
> >> >- python UDFs
> >> >
> >> >And the configuration option
> >> `taskmanager.memory.managed.consumer-weights`
> >> >only allows values: PYTHON and DATAPROC (state in streaming or
> algorithms
> >> >in batch).
> >> >I'm confused why it doesn't allow streaming operators to use managed
> >> memory
> >> >for purposes other than state backends.
> >> >
> >> >The background is that we are planning to use some batch algorithms
> >> >(sorting & bytes hash table) to improve the performance of streaming
> SQL
> >> >operators, especially for the mini-batch operators.
> >> >Currently, the mini-batch operators are buffering input records and
> >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> there
> >> >are potential risks of full GC and OOM.
> >> >With the managed memory, we can fully use the memory to buffer more
> data
> >> >without worrying about OOM and improve the performance a lot.
> >> >
> >> >What do you think about allowing streaming operators to use managed
> >> memory
> >> >and exposing it in configuration.
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >[1]:
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Jingsong Li
+1 for allowing streaming operators to use managed memory.

The memory use of streams requires some hierarchy, and the bottom layer is
undoubtedly the current StateBackend.
Let the stream operators freely use the managed memory, which will make the
memory management model to be unified and give the operator free space.

Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
`STATE_BACKEND` or `OPERATOR`.

Best,
Jingsong

On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:

> +1 to Xingtong's proposal!
>
> Best,
> Jark
>
> On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote:
>
> > +1 for allowing streaming operators to use managed memory.
> >
> > As for the consumer names, I'm afraid using `DATAPROC` for both streaming
> > ops and state backends will not work. Currently, RocksDB state backend
> uses
> > a shared piece of memory for all the states within that slot. It's not
> the
> > operator's decision how much memory it uses for the states.
> >
> > I would suggest the following. (IIUC, the same as what Jark proposed)
> > * `OPERATOR` for both streaming and bath operators
> > * `STATE_BACKEND` for state backends
> > * `PYTHON` for python processes
> > * `DATAPROC` as a legacy key for state backend or batch operators if
> > `STATE_BACKEND` or `OPERATOR` are not specified.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > `STATE_BACKEND`, because they have different scope (slot vs. operator).
> > > But @Xintong Song <[hidden email]> may have more insights on
> it.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]>
> > wrote:
> > >
> > >> I agree, we should allow streaming operators to use managed memory for
> > >> other use cases.
> > >>
> > >> Do you think we need an additional "consumer" setting or that they
> would
> > >> just use `DATAPROC` and decide by themselves what to use the memory
> for?
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On 2020/12/22 17:14, Jark Wu wrote:
> > >> >Hi all,
> > >> >
> > >> >I found that currently the managed memory can only be used in 3
> > workloads
> > >> >[1]:
> > >> >- state backends for streaming jobs
> > >> >- sorting, hash tables for batch jobs
> > >> >- python UDFs
> > >> >
> > >> >And the configuration option
> > >> `taskmanager.memory.managed.consumer-weights`
> > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > algorithms
> > >> >in batch).
> > >> >I'm confused why it doesn't allow streaming operators to use managed
> > >> memory
> > >> >for purposes other than state backends.
> > >> >
> > >> >The background is that we are planning to use some batch algorithms
> > >> >(sorting & bytes hash table) to improve the performance of streaming
> > SQL
> > >> >operators, especially for the mini-batch operators.
> > >> >Currently, the mini-batch operators are buffering input records and
> > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> > there
> > >> >are potential risks of full GC and OOM.
> > >> >With the managed memory, we can fully use the memory to buffer more
> > data
> > >> >without worrying about OOM and improve the performance a lot.
> > >> >
> > >> >What do you think about allowing streaming operators to use managed
> > >> memory
> > >> >and exposing it in configuration.
> > >> >
> > >> >Best,
> > >> >Jark
> > >> >
> > >> >[1]:
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > >>
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Till Rohrmann
+1 for Jark's and Xintong's proposal.

Would the default weight for OPERATOR and STATE_BACKEND be the same value?

Cheers,
Till

On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> wrote:

> +1 for allowing streaming operators to use managed memory.
>
> The memory use of streams requires some hierarchy, and the bottom layer is
> undoubtedly the current StateBackend.
> Let the stream operators freely use the managed memory, which will make the
> memory management model to be unified and give the operator free space.
>
> Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> `STATE_BACKEND` or `OPERATOR`.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:
>
> > +1 to Xingtong's proposal!
> >
> > Best,
> > Jark
> >
> > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]> wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > As for the consumer names, I'm afraid using `DATAPROC` for both
> streaming
> > > ops and state backends will not work. Currently, RocksDB state backend
> > uses
> > > a shared piece of memory for all the states within that slot. It's not
> > the
> > > operator's decision how much memory it uses for the states.
> > >
> > > I would suggest the following. (IIUC, the same as what Jark proposed)
> > > * `OPERATOR` for both streaming and bath operators
> > > * `STATE_BACKEND` for state backends
> > > * `PYTHON` for python processes
> > > * `DATAPROC` as a legacy key for state backend or batch operators if
> > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > `STATE_BACKEND`, because they have different scope (slot vs.
> operator).
> > > > But @Xintong Song <[hidden email]> may have more insights on
> > it.
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > > >
> > > >> I agree, we should allow streaming operators to use managed memory
> for
> > > >> other use cases.
> > > >>
> > > >> Do you think we need an additional "consumer" setting or that they
> > would
> > > >> just use `DATAPROC` and decide by themselves what to use the memory
> > for?
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > >> >Hi all,
> > > >> >
> > > >> >I found that currently the managed memory can only be used in 3
> > > workloads
> > > >> >[1]:
> > > >> >- state backends for streaming jobs
> > > >> >- sorting, hash tables for batch jobs
> > > >> >- python UDFs
> > > >> >
> > > >> >And the configuration option
> > > >> `taskmanager.memory.managed.consumer-weights`
> > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > algorithms
> > > >> >in batch).
> > > >> >I'm confused why it doesn't allow streaming operators to use
> managed
> > > >> memory
> > > >> >for purposes other than state backends.
> > > >> >
> > > >> >The background is that we are planning to use some batch algorithms
> > > >> >(sorting & bytes hash table) to improve the performance of
> streaming
> > > SQL
> > > >> >operators, especially for the mini-batch operators.
> > > >> >Currently, the mini-batch operators are buffering input records and
> > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> > > there
> > > >> >are potential risks of full GC and OOM.
> > > >> >With the managed memory, we can fully use the memory to buffer more
> > > data
> > > >> >without worrying about OOM and improve the performance a lot.
> > > >> >
> > > >> >What do you think about allowing streaming operators to use managed
> > > >> memory
> > > >> >and exposing it in configuration.
> > > >> >
> > > >> >Best,
> > > >> >Jark
> > > >> >
> > > >> >[1]:
> > > >> >
> > > >>
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > >>
> > > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Xintong Song
>
> Would the default weight for OPERATOR and STATE_BACKEND be the same value?
>

I would say yes, to align with previous behaviors.


Thank you~

Xintong Song



On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote:

> +1 for Jark's and Xintong's proposal.
>
> Would the default weight for OPERATOR and STATE_BACKEND be the same value?
>
> Cheers,
> Till
>
> On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]> wrote:
>
> > +1 for allowing streaming operators to use managed memory.
> >
> > The memory use of streams requires some hierarchy, and the bottom layer
> is
> > undoubtedly the current StateBackend.
> > Let the stream operators freely use the managed memory, which will make
> the
> > memory management model to be unified and give the operator free space.
> >
> > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > `STATE_BACKEND` or `OPERATOR`.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:
> >
> > > +1 to Xingtong's proposal!
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]>
> wrote:
> > >
> > > > +1 for allowing streaming operators to use managed memory.
> > > >
> > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > streaming
> > > > ops and state backends will not work. Currently, RocksDB state
> backend
> > > uses
> > > > a shared piece of memory for all the states within that slot. It's
> not
> > > the
> > > > operator's decision how much memory it uses for the states.
> > > >
> > > > I would suggest the following. (IIUC, the same as what Jark proposed)
> > > > * `OPERATOR` for both streaming and bath operators
> > > > * `STATE_BACKEND` for state backends
> > > > * `PYTHON` for python processes
> > > > * `DATAPROC` as a legacy key for state backend or batch operators if
> > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > operator).
> > > > > But @Xintong Song <[hidden email]> may have more insights
> on
> > > it.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <[hidden email]
> >
> > > > wrote:
> > > > >
> > > > >> I agree, we should allow streaming operators to use managed memory
> > for
> > > > >> other use cases.
> > > > >>
> > > > >> Do you think we need an additional "consumer" setting or that they
> > > would
> > > > >> just use `DATAPROC` and decide by themselves what to use the
> memory
> > > for?
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > >> >Hi all,
> > > > >> >
> > > > >> >I found that currently the managed memory can only be used in 3
> > > > workloads
> > > > >> >[1]:
> > > > >> >- state backends for streaming jobs
> > > > >> >- sorting, hash tables for batch jobs
> > > > >> >- python UDFs
> > > > >> >
> > > > >> >And the configuration option
> > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > > algorithms
> > > > >> >in batch).
> > > > >> >I'm confused why it doesn't allow streaming operators to use
> > managed
> > > > >> memory
> > > > >> >for purposes other than state backends.
> > > > >> >
> > > > >> >The background is that we are planning to use some batch
> algorithms
> > > > >> >(sorting & bytes hash table) to improve the performance of
> > streaming
> > > > SQL
> > > > >> >operators, especially for the mini-batch operators.
> > > > >> >Currently, the mini-batch operators are buffering input records
> and
> > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient
> and
> > > > there
> > > > >> >are potential risks of full GC and OOM.
> > > > >> >With the managed memory, we can fully use the memory to buffer
> more
> > > > data
> > > > >> >without worrying about OOM and improve the performance a lot.
> > > > >> >
> > > > >> >What do you think about allowing streaming operators to use
> managed
> > > > >> memory
> > > > >> >and exposing it in configuration.
> > > > >> >
> > > > >> >Best,
> > > > >> >Jark
> > > > >> >
> > > > >> >[1]:
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Jark Wu-2
Thanks all for the discussion.

I have created an issue FLINK-20860 [1] to support this.

In conclusion, we will extend the configuration
`taskmanager.memory.managed.consumer-weights` to have 2 more consumer
kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :

* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

The previous default value is DATAPROC:70,PYTHON:30, the new default value
will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.

The weight for OPERATOR and STATE_BACKEND will be the same value to align
with previous behaviors.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-20860

On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote:

> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
>
> I would say yes, to align with previous behaviors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote:
>
> > +1 for Jark's and Xintong's proposal.
> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]>
> wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > The memory use of streams requires some hierarchy, and the bottom layer
> > is
> > > undoubtedly the current StateBackend.
> > > Let the stream operators freely use the managed memory, which will make
> > the
> > > memory management model to be unified and give the operator free space.
> > >
> > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > `STATE_BACKEND` or `OPERATOR`.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:
> > >
> > > > +1 to Xingtong's proposal!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]>
> > wrote:
> > > >
> > > > > +1 for allowing streaming operators to use managed memory.
> > > > >
> > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > streaming
> > > > > ops and state backends will not work. Currently, RocksDB state
> > backend
> > > > uses
> > > > > a shared piece of memory for all the states within that slot. It's
> > not
> > > > the
> > > > > operator's decision how much memory it uses for the states.
> > > > >
> > > > > I would suggest the following. (IIUC, the same as what Jark
> proposed)
> > > > > * `OPERATOR` for both streaming and bath operators
> > > > > * `STATE_BACKEND` for state backends
> > > > > * `PYTHON` for python processes
> > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> if
> > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > operator).
> > > > > > But @Xintong Song <[hidden email]> may have more insights
> > on
> > > > it.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> [hidden email]
> > >
> > > > > wrote:
> > > > > >
> > > > > >> I agree, we should allow streaming operators to use managed
> memory
> > > for
> > > > > >> other use cases.
> > > > > >>
> > > > > >> Do you think we need an additional "consumer" setting or that
> they
> > > > would
> > > > > >> just use `DATAPROC` and decide by themselves what to use the
> > memory
> > > > for?
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > > >> >Hi all,
> > > > > >> >
> > > > > >> >I found that currently the managed memory can only be used in 3
> > > > > workloads
> > > > > >> >[1]:
> > > > > >> >- state backends for streaming jobs
> > > > > >> >- sorting, hash tables for batch jobs
> > > > > >> >- python UDFs
> > > > > >> >
> > > > > >> >And the configuration option
> > > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > > > algorithms
> > > > > >> >in batch).
> > > > > >> >I'm confused why it doesn't allow streaming operators to use
> > > managed
> > > > > >> memory
> > > > > >> >for purposes other than state backends.
> > > > > >> >
> > > > > >> >The background is that we are planning to use some batch
> > algorithms
> > > > > >> >(sorting & bytes hash table) to improve the performance of
> > > streaming
> > > > > SQL
> > > > > >> >operators, especially for the mini-batch operators.
> > > > > >> >Currently, the mini-batch operators are buffering input records
> > and
> > > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient
> > and
> > > > > there
> > > > > >> >are potential risks of full GC and OOM.
> > > > > >> >With the managed memory, we can fully use the memory to buffer
> > more
> > > > > data
> > > > > >> >without worrying about OOM and improve the performance a lot.
> > > > > >> >
> > > > > >> >What do you think about allowing streaming operators to use
> > managed
> > > > > >> memory
> > > > > >> >and exposing it in configuration.
> > > > > >> >
> > > > > >> >Best,
> > > > > >> >Jark
> > > > > >> >
> > > > > >> >[1]:
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Yun Tang
I think using managed memory within streaming operator is a good idea and I just have a question over last conclusion:

If both OPERATOR and STATE_BACKEND set as 70 to align with previous behavior, what will happen if one slot has both consumers of managed streaming operator and state backend?

As you can see previous DATAPROC + PYTHON = 100, which describes the situation when one slot has both consumers of managed python and state backend.

Best
Yun Tang
________________________________
From: Jark Wu <[hidden email]>
Sent: Wednesday, January 6, 2021 13:51
To: dev <[hidden email]>
Subject: Re: [DISCUSS] Allow streaming operators to use managed memory

Thanks all for the discussion.

I have created an issue FLINK-20860 [1] to support this.

In conclusion, we will extend the configuration
`taskmanager.memory.managed.consumer-weights` to have 2 more consumer
kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :

* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

The previous default value is DATAPROC:70,PYTHON:30, the new default value
will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.

The weight for OPERATOR and STATE_BACKEND will be the same value to align
with previous behaviors.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-20860

On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote:

> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
>
> I would say yes, to align with previous behaviors.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]> wrote:
>
> > +1 for Jark's and Xintong's proposal.
> >
> > Would the default weight for OPERATOR and STATE_BACKEND be the same
> value?
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]>
> wrote:
> >
> > > +1 for allowing streaming operators to use managed memory.
> > >
> > > The memory use of streams requires some hierarchy, and the bottom layer
> > is
> > > undoubtedly the current StateBackend.
> > > Let the stream operators freely use the managed memory, which will make
> > the
> > > memory management model to be unified and give the operator free space.
> > >
> > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > `STATE_BACKEND` or `OPERATOR`.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:
> > >
> > > > +1 to Xingtong's proposal!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]>
> > wrote:
> > > >
> > > > > +1 for allowing streaming operators to use managed memory.
> > > > >
> > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > streaming
> > > > > ops and state backends will not work. Currently, RocksDB state
> > backend
> > > > uses
> > > > > a shared piece of memory for all the states within that slot. It's
> > not
> > > > the
> > > > > operator's decision how much memory it uses for the states.
> > > > >
> > > > > I would suggest the following. (IIUC, the same as what Jark
> proposed)
> > > > > * `OPERATOR` for both streaming and bath operators
> > > > > * `STATE_BACKEND` for state backends
> > > > > * `PYTHON` for python processes
> > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> if
> > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]> wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > operator).
> > > > > > But @Xintong Song <[hidden email]> may have more insights
> > on
> > > > it.
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> [hidden email]
> > >
> > > > > wrote:
> > > > > >
> > > > > >> I agree, we should allow streaming operators to use managed
> memory
> > > for
> > > > > >> other use cases.
> > > > > >>
> > > > > >> Do you think we need an additional "consumer" setting or that
> they
> > > > would
> > > > > >> just use `DATAPROC` and decide by themselves what to use the
> > memory
> > > > for?
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > > >> >Hi all,
> > > > > >> >
> > > > > >> >I found that currently the managed memory can only be used in 3
> > > > > workloads
> > > > > >> >[1]:
> > > > > >> >- state backends for streaming jobs
> > > > > >> >- sorting, hash tables for batch jobs
> > > > > >> >- python UDFs
> > > > > >> >
> > > > > >> >And the configuration option
> > > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > > > > algorithms
> > > > > >> >in batch).
> > > > > >> >I'm confused why it doesn't allow streaming operators to use
> > > managed
> > > > > >> memory
> > > > > >> >for purposes other than state backends.
> > > > > >> >
> > > > > >> >The background is that we are planning to use some batch
> > algorithms
> > > > > >> >(sorting & bytes hash table) to improve the performance of
> > > streaming
> > > > > SQL
> > > > > >> >operators, especially for the mini-batch operators.
> > > > > >> >Currently, the mini-batch operators are buffering input records
> > and
> > > > > >> >accumulators in heap (i.e. Java HashMap) which is not efficient
> > and
> > > > > there
> > > > > >> >are potential risks of full GC and OOM.
> > > > > >> >With the managed memory, we can fully use the memory to buffer
> > more
> > > > > data
> > > > > >> >without worrying about OOM and improve the performance a lot.
> > > > > >> >
> > > > > >> >What do you think about allowing streaming operators to use
> > managed
> > > > > >> memory
> > > > > >> >and exposing it in configuration.
> > > > > >> >
> > > > > >> >Best,
> > > > > >> >Jark
> > > > > >> >
> > > > > >> >[1]:
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allow streaming operators to use managed memory

Xintong Song
Thanks for driving the discussion, @Jark. The conclusion LGTM.

@Yun,
Since the streaming operators did not use managed memory previously, I
don't think it's possible for any use cases with managed memory streaming
operators to align with the previous behaviors.
No matter how the consumer weights are configured, some of the managed
memory has to be transferred from the previous consumers to the new managed
memory streaming operators.

Thank you~

Xintong Song



On Wed, Jan 6, 2021 at 2:38 PM Yun Tang <[hidden email]> wrote:

> I think using managed memory within streaming operator is a good idea and
> I just have a question over last conclusion:
>
> If both OPERATOR and STATE_BACKEND set as 70 to align with previous
> behavior, what will happen if one slot has both consumers of managed
> streaming operator and state backend?
>
> As you can see previous DATAPROC + PYTHON = 100, which describes the
> situation when one slot has both consumers of managed python and state
> backend.
>
> Best
> Yun Tang
> ________________________________
> From: Jark Wu <[hidden email]>
> Sent: Wednesday, January 6, 2021 13:51
> To: dev <[hidden email]>
> Subject: Re: [DISCUSS] Allow streaming operators to use managed memory
>
> Thanks all for the discussion.
>
> I have created an issue FLINK-20860 [1] to support this.
>
> In conclusion, we will extend the configuration
> `taskmanager.memory.managed.consumer-weights` to have 2 more consumer
> kinds: OPERATOR and STATE_BACKEND, the available consumer kinds will be :
>
> * `OPERATOR` for both streaming and bath operators
> * `STATE_BACKEND` for state backends
> * `PYTHON` for python processes
> * `DATAPROC` as a legacy key for state backend or batch operators if
> `STATE_BACKEND` or `OPERATOR` are not specified.
>
> The previous default value is DATAPROC:70,PYTHON:30, the new default value
> will be OPERATOR:70,STATE_BACKEND:70,PYTHON:30.
>
> The weight for OPERATOR and STATE_BACKEND will be the same value to align
> with previous behaviors.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-20860
>
> On Tue, 5 Jan 2021 at 18:35, Xintong Song <[hidden email]> wrote:
>
> > >
> > > Would the default weight for OPERATOR and STATE_BACKEND be the same
> > value?
> > >
> >
> > I would say yes, to align with previous behaviors.
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 5, 2021 at 5:51 PM Till Rohrmann <[hidden email]>
> wrote:
> >
> > > +1 for Jark's and Xintong's proposal.
> > >
> > > Would the default weight for OPERATOR and STATE_BACKEND be the same
> > value?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jan 5, 2021 at 6:39 AM Jingsong Li <[hidden email]>
> > wrote:
> > >
> > > > +1 for allowing streaming operators to use managed memory.
> > > >
> > > > The memory use of streams requires some hierarchy, and the bottom
> layer
> > > is
> > > > undoubtedly the current StateBackend.
> > > > Let the stream operators freely use the managed memory, which will
> make
> > > the
> > > > memory management model to be unified and give the operator free
> space.
> > > >
> > > > Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
> > > > `STATE_BACKEND` or `OPERATOR`.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Tue, Jan 5, 2021 at 12:33 PM Jark Wu <[hidden email]> wrote:
> > > >
> > > > > +1 to Xingtong's proposal!
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Tue, 5 Jan 2021 at 12:13, Xintong Song <[hidden email]>
> > > wrote:
> > > > >
> > > > > > +1 for allowing streaming operators to use managed memory.
> > > > > >
> > > > > > As for the consumer names, I'm afraid using `DATAPROC` for both
> > > > streaming
> > > > > > ops and state backends will not work. Currently, RocksDB state
> > > backend
> > > > > uses
> > > > > > a shared piece of memory for all the states within that slot.
> It's
> > > not
> > > > > the
> > > > > > operator's decision how much memory it uses for the states.
> > > > > >
> > > > > > I would suggest the following. (IIUC, the same as what Jark
> > proposed)
> > > > > > * `OPERATOR` for both streaming and bath operators
> > > > > > * `STATE_BACKEND` for state backends
> > > > > > * `PYTHON` for python processes
> > > > > > * `DATAPROC` as a legacy key for state backend or batch operators
> > if
> > > > > > `STATE_BACKEND` or `OPERATOR` are not specified.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu <[hidden email]>
> wrote:
> > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > >
> > > > > > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > > > > > `STATE_BACKEND`, because they have different scope (slot vs.
> > > > operator).
> > > > > > > But @Xintong Song <[hidden email]> may have more
> insights
> > > on
> > > > > it.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> I agree, we should allow streaming operators to use managed
> > memory
> > > > for
> > > > > > >> other use cases.
> > > > > > >>
> > > > > > >> Do you think we need an additional "consumer" setting or that
> > they
> > > > > would
> > > > > > >> just use `DATAPROC` and decide by themselves what to use the
> > > memory
> > > > > for?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Aljoscha
> > > > > > >>
> > > > > > >> On 2020/12/22 17:14, Jark Wu wrote:
> > > > > > >> >Hi all,
> > > > > > >> >
> > > > > > >> >I found that currently the managed memory can only be used
> in 3
> > > > > > workloads
> > > > > > >> >[1]:
> > > > > > >> >- state backends for streaming jobs
> > > > > > >> >- sorting, hash tables for batch jobs
> > > > > > >> >- python UDFs
> > > > > > >> >
> > > > > > >> >And the configuration option
> > > > > > >> `taskmanager.memory.managed.consumer-weights`
> > > > > > >> >only allows values: PYTHON and DATAPROC (state in streaming
> or
> > > > > > algorithms
> > > > > > >> >in batch).
> > > > > > >> >I'm confused why it doesn't allow streaming operators to use
> > > > managed
> > > > > > >> memory
> > > > > > >> >for purposes other than state backends.
> > > > > > >> >
> > > > > > >> >The background is that we are planning to use some batch
> > > algorithms
> > > > > > >> >(sorting & bytes hash table) to improve the performance of
> > > > streaming
> > > > > > SQL
> > > > > > >> >operators, especially for the mini-batch operators.
> > > > > > >> >Currently, the mini-batch operators are buffering input
> records
> > > and
> > > > > > >> >accumulators in heap (i.e. Java HashMap) which is not
> efficient
> > > and
> > > > > > there
> > > > > > >> >are potential risks of full GC and OOM.
> > > > > > >> >With the managed memory, we can fully use the memory to
> buffer
> > > more
> > > > > > data
> > > > > > >> >without worrying about OOM and improve the performance a lot.
> > > > > > >> >
> > > > > > >> >What do you think about allowing streaming operators to use
> > > managed
> > > > > > >> memory
> > > > > > >> >and exposing it in configuration.
> > > > > > >> >
> > > > > > >> >Best,
> > > > > > >> >Jark
> > > > > > >> >
> > > > > > >> >[1]:
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> > >
> >
>