[DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi, all


In the Flink 1.12 we introduce the TM merge shuffle. But the out-of-the-box
experience of using TM merge shuffle is not very good. The main reason is
that the default configuration always makes users encounter OOM [1]. So we
hope to introduce a managed memory pool for TM merge shuffle to avoid the
problem.
Goals

   1. Don't affect the streaming and pipelined-shuffle-only batch setups.
   2. Don't mix memory with different life cycle in the same pool. E.g.,
   write buffers needed by running tasks and read buffer needed even after
   tasks being finished.
   3. User can use the TM merge shuffle with default memory configurations.
   (May need further tunings for performance optimization, but should not fail
   with the default configurations.)

Proposal

   1. Introduce a configuration `taskmanager.memory.network.batch-read` to
   specify the size of this memory pool. The default value is 16m.
   2. Allocate the pool lazily. It means that the memory pool would be
   allocated when the TM merge shuffle is used at the first time.
   3. This pool size will not be add up to the TM's total memory size, but
   will be considered part of `taskmanager.memory.framework.off-heap.size`. We
   need to check that the pool size is not larger than the framework off-heap
   size, if TM merge shuffle is enabled.


In this default configuration, the allocation of the memory pool is almost
impossible to fail. Currently the default framework’s off-heap memory is
128m, which is mainly used by Netty. But after we introduced zero copy, the
usage of it has been reduced, and you can refer to the detailed data [2].
Known Limitation
Usability for increasing the memory pool size

In addition to increasing `taskmanager.memory.network.batch-read`, the user
may also need to adjust `taskmanager.memory.framework.off-heap.size` at the
same time. It also means that once the user forgets this, it is likely to
fail the check when allocating the memory pool.


So in the following two situations, we will still prompt the user to
increase the size of `framework.off-heap.size`.

   1. `taskmanager.memory.network.batch-read` is bigger than
   `taskmanager.memory.framework.off-heap.size`
   2. Allocating the pool encounters the OOM.


An alternative is that when the user adjusts the size of the memory pool,
the system automatically adjusts it. But we are not entierly sure about
this, given its implicity and complicating the memory configurations.
Potential memory waste

In the first step, the memory pool will not be released once allocated. This
means in the first step, even if there is no subsequent batch job, the
pooled memory cannot be used by other consumers.


We are not releasing the pool in the first step due to the concern that
frequently allocating/deallocating the entire pool may increase the GC
pressue. Investitations on how to dynamically release the pool when it's no
longer needed is considered a future follow-up.


Looking forward to your feedback.



[1] https://issues.apache.org/jira/browse/FLINK-20740

[2] https://github.com/apache/flink/pull/7368.
Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Stephan Ewen-4
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool.
How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan


On Fri, Mar 5, 2021 at 4:32 AM Guowei Ma <[hidden email]> wrote:

> Hi, all
>
>
> In the Flink 1.12 we introduce the TM merge shuffle. But the
> out-of-the-box experience of using TM merge shuffle is not very good. The
> main reason is that the default configuration always makes users encounter
> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
> to avoid the problem.
> Goals
>
>    1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>    2. Don't mix memory with different life cycle in the same pool. E.g.,
>    write buffers needed by running tasks and read buffer needed even after
>    tasks being finished.
>    3. User can use the TM merge shuffle with default memory
>    configurations. (May need further tunings for performance optimization, but
>    should not fail with the default configurations.)
>
> Proposal
>
>    1. Introduce a configuration `taskmanager.memory.network.batch-read`
>    to specify the size of this memory pool. The default value is 16m.
>    2. Allocate the pool lazily. It means that the memory pool would be
>    allocated when the TM merge shuffle is used at the first time.
>    3. This pool size will not be add up to the TM's total memory size,
>    but will be considered part of
>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>    pool size is not larger than the framework off-heap size, if TM merge
>    shuffle is enabled.
>
>
> In this default configuration, the allocation of the memory pool is almost
> impossible to fail. Currently the default framework’s off-heap memory is
> 128m, which is mainly used by Netty. But after we introduced zero copy, the
> usage of it has been reduced, and you can refer to the detailed data [2].
> Known Limitation
> Usability for increasing the memory pool size
>
> In addition to increasing `taskmanager.memory.network.batch-read`, the
> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
> at the same time. It also means that once the user forgets this, it is
> likely to fail the check when allocating the memory pool.
>
>
> So in the following two situations, we will still prompt the user to
> increase the size of `framework.off-heap.size`.
>
>    1. `taskmanager.memory.network.batch-read` is bigger than
>    `taskmanager.memory.framework.off-heap.size`
>    2. Allocating the pool encounters the OOM.
>
>
> An alternative is that when the user adjusts the size of the memory pool,
> the system automatically adjusts it. But we are not entierly sure about
> this, given its implicity and complicating the memory configurations.
> Potential memory waste
>
> In the first step, the memory pool will not be released once allocated. This
> means in the first step, even if there is no subsequent batch job, the
> pooled memory cannot be used by other consumers.
>
>
> We are not releasing the pool in the first step due to the concern that
> frequently allocating/deallocating the entire pool may increase the GC
> pressue. Investitations on how to dynamically release the pool when it's no
> longer needed is considered a future follow-up.
>
>
> Looking forward to your feedback.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20740
>
> [2] https://github.com/apache/flink/pull/7368.
> Best,
> Guowei
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Stephan Ewen
In reply to this post by Guowei Ma
Thanks Guowei, for the proposal.

As discussed offline already, I think this sounds good.

One thought is that 16m sounds very small for a default read buffer pool.
How risky do you think it is to increase this to 32m or 64m?

Best,
Stephan

On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:

> Hi, all
>
>
> In the Flink 1.12 we introduce the TM merge shuffle. But the
> out-of-the-box experience of using TM merge shuffle is not very good. The
> main reason is that the default configuration always makes users encounter
> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
> to avoid the problem.
> Goals
>
>    1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>    2. Don't mix memory with different life cycle in the same pool. E.g.,
>    write buffers needed by running tasks and read buffer needed even after
>    tasks being finished.
>    3. User can use the TM merge shuffle with default memory
>    configurations. (May need further tunings for performance optimization, but
>    should not fail with the default configurations.)
>
> Proposal
>
>    1. Introduce a configuration `taskmanager.memory.network.batch-read`
>    to specify the size of this memory pool. The default value is 16m.
>    2. Allocate the pool lazily. It means that the memory pool would be
>    allocated when the TM merge shuffle is used at the first time.
>    3. This pool size will not be add up to the TM's total memory size,
>    but will be considered part of
>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>    pool size is not larger than the framework off-heap size, if TM merge
>    shuffle is enabled.
>
>
> In this default configuration, the allocation of the memory pool is almost
> impossible to fail. Currently the default framework’s off-heap memory is
> 128m, which is mainly used by Netty. But after we introduced zero copy, the
> usage of it has been reduced, and you can refer to the detailed data [2].
> Known Limitation
> Usability for increasing the memory pool size
>
> In addition to increasing `taskmanager.memory.network.batch-read`, the
> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
> at the same time. It also means that once the user forgets this, it is
> likely to fail the check when allocating the memory pool.
>
>
> So in the following two situations, we will still prompt the user to
> increase the size of `framework.off-heap.size`.
>
>    1. `taskmanager.memory.network.batch-read` is bigger than
>    `taskmanager.memory.framework.off-heap.size`
>    2. Allocating the pool encounters the OOM.
>
>
> An alternative is that when the user adjusts the size of the memory pool,
> the system automatically adjusts it. But we are not entierly sure about
> this, given its implicity and complicating the memory configurations.
> Potential memory waste
>
> In the first step, the memory pool will not be released once allocated. This
> means in the first step, even if there is no subsequent batch job, the
> pooled memory cannot be used by other consumers.
>
>
> We are not releasing the pool in the first step due to the concern that
> frequently allocating/deallocating the entire pool may increase the GC
> pressue. Investitations on how to dynamically release the pool when it's no
> longer needed is considered a future follow-up.
>
>
> Looking forward to your feedback.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20740
>
> [2] https://github.com/apache/flink/pull/7368.
> Best,
> Guowei
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Till Rohrmann
Thanks for this proposal Guowei. +1 for it.

Concerning the default size, maybe we can run some experiments and see how
the system behaves with different pool sizes.

Cheers,
Till

On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:

> Thanks Guowei, for the proposal.
>
> As discussed offline already, I think this sounds good.
>
> One thought is that 16m sounds very small for a default read buffer pool.
> How risky do you think it is to increase this to 32m or 64m?
>
> Best,
> Stephan
>
> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>
>> Hi, all
>>
>>
>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>> out-of-the-box experience of using TM merge shuffle is not very good. The
>> main reason is that the default configuration always makes users encounter
>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>> to avoid the problem.
>> Goals
>>
>>    1. Don't affect the streaming and pipelined-shuffle-only batch setups.
>>    2. Don't mix memory with different life cycle in the same pool. E.g.,
>>    write buffers needed by running tasks and read buffer needed even after
>>    tasks being finished.
>>    3. User can use the TM merge shuffle with default memory
>>    configurations. (May need further tunings for performance optimization, but
>>    should not fail with the default configurations.)
>>
>> Proposal
>>
>>    1. Introduce a configuration `taskmanager.memory.network.batch-read`
>>    to specify the size of this memory pool. The default value is 16m.
>>    2. Allocate the pool lazily. It means that the memory pool would be
>>    allocated when the TM merge shuffle is used at the first time.
>>    3. This pool size will not be add up to the TM's total memory size,
>>    but will be considered part of
>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>    pool size is not larger than the framework off-heap size, if TM merge
>>    shuffle is enabled.
>>
>>
>> In this default configuration, the allocation of the memory pool is
>> almost impossible to fail. Currently the default framework’s off-heap
>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>> copy, the usage of it has been reduced, and you can refer to the detailed
>> data [2].
>> Known Limitation
>> Usability for increasing the memory pool size
>>
>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>> at the same time. It also means that once the user forgets this, it is
>> likely to fail the check when allocating the memory pool.
>>
>>
>> So in the following two situations, we will still prompt the user to
>> increase the size of `framework.off-heap.size`.
>>
>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>    `taskmanager.memory.framework.off-heap.size`
>>    2. Allocating the pool encounters the OOM.
>>
>>
>> An alternative is that when the user adjusts the size of the memory pool,
>> the system automatically adjusts it. But we are not entierly sure about
>> this, given its implicity and complicating the memory configurations.
>> Potential memory waste
>>
>> In the first step, the memory pool will not be released once allocated. This
>> means in the first step, even if there is no subsequent batch job, the
>> pooled memory cannot be used by other consumers.
>>
>>
>> We are not releasing the pool in the first step due to the concern that
>> frequently allocating/deallocating the entire pool may increase the GC
>> pressue. Investitations on how to dynamically release the pool when it's no
>> longer needed is considered a future follow-up.
>>
>>
>> Looking forward to your feedback.
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>
>> [2] https://github.com/apache/flink/pull/7368.
>> Best,
>> Guowei
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi, all

Thanks all for your suggestions and feedback.
I think it is a good idea that we increase the default size of the
separated pool by testing. I am fine with adding the suffix(".size") to the
config name, which makes it more clear to the user.
But I am a little worried about adding a prefix("framework") because
currently the tm shuffle service is only a shuffle-plugin, which is not a
part of the framework. So maybe we could add a clear explanation in the
document?

Best,
Guowei


On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:

> Thanks for the suggestions. I will do some tests and share the results
> after the implementation is ready. Then we can give a proper default value.
>
> Best,
> Yingjie
>
> ------------------------------------------------------------------
> 发件人:Till Rohrmann<[hidden email]>
> 日 期:2021年03月05日 23:03:10
> 收件人:Stephan Ewen<[hidden email]>
> 抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<
> [hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<
> [hidden email]>
> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
> shuffle
>
> Thanks for this proposal Guowei. +1 for it.
>
> Concerning the default size, maybe we can run some experiments and see how
> the system behaves with different pool sizes.
>
> Cheers,
> Till
>
> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
>
>> Thanks Guowei, for the proposal.
>>
>> As discussed offline already, I think this sounds good.
>>
>> One thought is that 16m sounds very small for a default read buffer pool.
>> How risky do you think it is to increase this to 32m or 64m?
>>
>> Best,
>> Stephan
>>
>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>>
>>> Hi, all
>>>
>>>
>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>> main reason is that the default configuration always makes users encounter
>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>> to avoid the problem.
>>> Goals
>>>
>>>    1. Don't affect the streaming and pipelined-shuffle-only batch
>>>    setups.
>>>    2. Don't mix memory with different life cycle in the same pool.
>>>    E.g., write buffers needed by running tasks and read buffer needed even
>>>    after tasks being finished.
>>>    3. User can use the TM merge shuffle with default memory
>>>    configurations. (May need further tunings for performance optimization, but
>>>    should not fail with the default configurations.)
>>>
>>> Proposal
>>>
>>>    1. Introduce a configuration `taskmanager.memory.network.batch-read`
>>>    to specify the size of this memory pool. The default value is 16m.
>>>    2. Allocate the pool lazily. It means that the memory pool would be
>>>    allocated when the TM merge shuffle is used at the first time.
>>>    3. This pool size will not be add up to the TM's total memory size,
>>>    but will be considered part of
>>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>    pool size is not larger than the framework off-heap size, if TM merge
>>>    shuffle is enabled.
>>>
>>>
>>> In this default configuration, the allocation of the memory pool is
>>> almost impossible to fail. Currently the default framework’s off-heap
>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>> data [2].
>>> Known Limitation
>>> Usability for increasing the memory pool size
>>>
>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>> at the same time. It also means that once the user forgets this, it is
>>> likely to fail the check when allocating the memory pool.
>>>
>>>
>>> So in the following two situations, we will still prompt the user to
>>> increase the size of `framework.off-heap.size`.
>>>
>>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>>    `taskmanager.memory.framework.off-heap.size`
>>>    2. Allocating the pool encounters the OOM.
>>>
>>>
>>> An alternative is that when the user adjusts the size of the memory
>>> pool, the system automatically adjusts it. But we are not entierly sure
>>> about this, given its implicity and complicating the memory configurations.
>>> Potential memory waste
>>>
>>> In the first step, the memory pool will not be released once allocated. This
>>> means in the first step, even if there is no subsequent batch job, the
>>> pooled memory cannot be used by other consumers.
>>>
>>>
>>> We are not releasing the pool in the first step due to the concern that
>>> frequently allocating/deallocating the entire pool may increase the GC
>>> pressue. Investitations on how to dynamically release the pool when it's no
>>> longer needed is considered a future follow-up.
>>>
>>>
>>> Looking forward to your feedback.
>>>
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>
>>> [2] https://github.com/apache/flink/pull/7368.
>>> Best,
>>> Guowei
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Xintong Song
It was originally designed that the framework vs. task memory are
distinguished by whether the memory owned by the slots. We have not
emphasized this in user documents because the dynamic slot allocation is
not yet available to the users, thus they do not need to worry about
whether the memory is owned by slots.

It's against intuition IMO that a memory pool configured via
`t.m.network.*` is actually subtracted from `t.m.framework.off-heap.*`.

Thank you~

Xintong Song



On Tue, Mar 9, 2021 at 5:28 PM Guowei Ma <[hidden email]> wrote:

> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> ------------------------------------------------------------------
>> 发件人:Till Rohrmann<[hidden email]>
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen<[hidden email]>
>> 抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<
>> [hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<
>> [hidden email]>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>>>
>>>> Hi, all
>>>>
>>>>
>>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>>> main reason is that the default configuration always makes users encounter
>>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>>> to avoid the problem.
>>>> Goals
>>>>
>>>>    1. Don't affect the streaming and pipelined-shuffle-only batch
>>>>    setups.
>>>>    2. Don't mix memory with different life cycle in the same pool.
>>>>    E.g., write buffers needed by running tasks and read buffer needed even
>>>>    after tasks being finished.
>>>>    3. User can use the TM merge shuffle with default memory
>>>>    configurations. (May need further tunings for performance optimization, but
>>>>    should not fail with the default configurations.)
>>>>
>>>> Proposal
>>>>
>>>>    1. Introduce a configuration
>>>>    `taskmanager.memory.network.batch-read` to specify the size of this memory
>>>>    pool. The default value is 16m.
>>>>    2. Allocate the pool lazily. It means that the memory pool would be
>>>>    allocated when the TM merge shuffle is used at the first time.
>>>>    3. This pool size will not be add up to the TM's total memory size,
>>>>    but will be considered part of
>>>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>>    pool size is not larger than the framework off-heap size, if TM merge
>>>>    shuffle is enabled.
>>>>
>>>>
>>>> In this default configuration, the allocation of the memory pool is
>>>> almost impossible to fail. Currently the default framework’s off-heap
>>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>>> data [2].
>>>> Known Limitation
>>>> Usability for increasing the memory pool size
>>>>
>>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>>> at the same time. It also means that once the user forgets this, it is
>>>> likely to fail the check when allocating the memory pool.
>>>>
>>>>
>>>> So in the following two situations, we will still prompt the user to
>>>> increase the size of `framework.off-heap.size`.
>>>>
>>>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>>>    `taskmanager.memory.framework.off-heap.size`
>>>>    2. Allocating the pool encounters the OOM.
>>>>
>>>>
>>>> An alternative is that when the user adjusts the size of the memory
>>>> pool, the system automatically adjusts it. But we are not entierly sure
>>>> about this, given its implicity and complicating the memory configurations.
>>>> Potential memory waste
>>>>
>>>> In the first step, the memory pool will not be released once allocated. This
>>>> means in the first step, even if there is no subsequent batch job, the
>>>> pooled memory cannot be used by other consumers.
>>>>
>>>>
>>>> We are not releasing the pool in the first step due to the concern that
>>>> frequently allocating/deallocating the entire pool may increase the GC
>>>> pressue. Investitations on how to dynamically release the pool when it's no
>>>> longer needed is considered a future follow-up.
>>>>
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>>
>>>> [2] https://github.com/apache/flink/pull/7368.
>>>> Best,
>>>> Guowei
>>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Till Rohrmann
In reply to this post by Guowei Ma
Thanks for the update Yingjie. Then let's go with 32 MB I would say.

Concerning the name of the configuration option I see Xintong's point. If
the batch shuffle is subtracted from
`taskmanager.memory.framework.off-heap.size` because it is part of the
off-heap pool, then something like
`taskmanager.memory.framework.off-heap.batch-shuffle.size` would better
reflect this situation. On the other hand, this is quite a long
configuration name. But it is also a quite advanced configuration option
which, hopefully, should not be touched by too many of our users.

Cheers,
Till

On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:

> Hi all,
>
> I have tested the default memory size with both batch (tpcds) and
> streaming jobs running in one session cluster (more than 100 queries). The
> result is:
> 1. All settings (16M, 32M and 64M) can work well without any OOM.
> 2. For streaming jobs running after batch jobs, there is no performance or
> stability regression.
> 2. 32M and 64M is better (over 10%) in terms of performance for the test
> batch job on HDD.
>
> Based on the above results, I think 32M is a good default choice, because
> the performance is good enough for the test job and compared to 64M, more
> direct memory can be used by netty and other components. What do you think?
>
> BTW, about the configuration key, do we reach a consensus? I am
> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
> my PR now. Any suggestions about that?
>
> Best,
> Yingjie (Kevin)
>
> ------------------------------------------------------------------
> 发件人:Guowei Ma<[hidden email]>
> 日 期:2021年03月09日 17:28:35
> 收件人:曹英杰(北牧)<[hidden email]>
> 抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>;
> dev<[hidden email]>; user<[hidden email]>; Xintong Song<
> [hidden email]>
> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
> merge shuffle
>
> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> ------------------------------------------------------------------
>> 发件人:Till Rohrmann<[hidden email]>
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen<[hidden email]>
>> 抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<
>> [hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<
>> [hidden email]>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>>>
>>>> Hi, all
>>>>
>>>>
>>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>>> main reason is that the default configuration always makes users encounter
>>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>>> to avoid the problem.
>>>> Goals
>>>>
>>>>    1. Don't affect the streaming and pipelined-shuffle-only batch
>>>>    setups.
>>>>    2. Don't mix memory with different life cycle in the same pool.
>>>>    E.g., write buffers needed by running tasks and read buffer needed even
>>>>    after tasks being finished.
>>>>    3. User can use the TM merge shuffle with default memory
>>>>    configurations. (May need further tunings for performance optimization, but
>>>>    should not fail with the default configurations.)
>>>>
>>>> Proposal
>>>>
>>>>    1. Introduce a configuration
>>>>    `taskmanager.memory.network.batch-read` to specify the size of this memory
>>>>    pool. The default value is 16m.
>>>>    2. Allocate the pool lazily. It means that the memory pool would be
>>>>    allocated when the TM merge shuffle is used at the first time.
>>>>    3. This pool size will not be add up to the TM's total memory size,
>>>>    but will be considered part of
>>>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>>    pool size is not larger than the framework off-heap size, if TM merge
>>>>    shuffle is enabled.
>>>>
>>>>
>>>> In this default configuration, the allocation of the memory pool is
>>>> almost impossible to fail. Currently the default framework’s off-heap
>>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>>> data [2].
>>>> Known Limitation
>>>> Usability for increasing the memory pool size
>>>>
>>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>>> at the same time. It also means that once the user forgets this, it is
>>>> likely to fail the check when allocating the memory pool.
>>>>
>>>>
>>>> So in the following two situations, we will still prompt the user to
>>>> increase the size of `framework.off-heap.size`.
>>>>
>>>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>>>    `taskmanager.memory.framework.off-heap.size`
>>>>    2. Allocating the pool encounters the OOM.
>>>>
>>>>
>>>> An alternative is that when the user adjusts the size of the memory
>>>> pool, the system automatically adjusts it. But we are not entierly sure
>>>> about this, given its implicity and complicating the memory configurations.
>>>> Potential memory waste
>>>>
>>>> In the first step, the memory pool will not be released once allocated. This
>>>> means in the first step, even if there is no subsequent batch job, the
>>>> pooled memory cannot be used by other consumers.
>>>>
>>>>
>>>> We are not releasing the pool in the first step due to the concern that
>>>> frequently allocating/deallocating the entire pool may increase the GC
>>>> pressue. Investitations on how to dynamically release the pool when it's no
>>>> longer needed is considered a future follow-up.
>>>>
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>>
>>>> [2] https://github.com/apache/flink/pull/7368.
>>>> Best,
>>>> Guowei
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Stephan Ewen
In reply to this post by Guowei Ma
Hi Yingjie!

Thanks for doing those experiments, the results look good. Let's go ahead
with 32M then.

Regarding the key, I am not strongly opinionated there. There are arguments
for both keys, (1) making the key part of the network pool config as you
did here or (2) making it part of the TM config (relative to framework
off-heap memory). I find (1) quite understandable, but it is personal
taste, so I can go with either option.

Best,
Stephan


On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:

> Hi all,
>
> I have tested the default memory size with both batch (tpcds) and
> streaming jobs running in one session cluster (more than 100 queries). The
> result is:
> 1. All settings (16M, 32M and 64M) can work well without any OOM.
> 2. For streaming jobs running after batch jobs, there is no performance or
> stability regression.
> 2. 32M and 64M is better (over 10%) in terms of performance for the test
> batch job on HDD.
>
> Based on the above results, I think 32M is a good default choice, because
> the performance is good enough for the test job and compared to 64M, more
> direct memory can be used by netty and other components. What do you think?
>
> BTW, about the configuration key, do we reach a consensus? I am
> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
> my PR now. Any suggestions about that?
>
> Best,
> Yingjie (Kevin)
>
> ------------------------------------------------------------------
> 发件人:Guowei Ma<[hidden email]>
> 日 期:2021年03月09日 17:28:35
> 收件人:曹英杰(北牧)<[hidden email]>
> 抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>;
> dev<[hidden email]>; user<[hidden email]>; Xintong Song<
> [hidden email]>
> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
> merge shuffle
>
> Hi, all
>
> Thanks all for your suggestions and feedback.
> I think it is a good idea that we increase the default size of the
> separated pool by testing. I am fine with adding the suffix(".size") to the
> config name, which makes it more clear to the user.
> But I am a little worried about adding a prefix("framework") because
> currently the tm shuffle service is only a shuffle-plugin, which is not a
> part of the framework. So maybe we could add a clear explanation in the
> document?
>
> Best,
> Guowei
>
>
> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
>
>> Thanks for the suggestions. I will do some tests and share the results
>> after the implementation is ready. Then we can give a proper default value.
>>
>> Best,
>> Yingjie
>>
>> ------------------------------------------------------------------
>> 发件人:Till Rohrmann<[hidden email]>
>> 日 期:2021年03月05日 23:03:10
>> 收件人:Stephan Ewen<[hidden email]>
>> 抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong Song<
>> [hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei Ma<
>> [hidden email]>
>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>> shuffle
>>
>> Thanks for this proposal Guowei. +1 for it.
>>
>> Concerning the default size, maybe we can run some experiments and see
>> how the system behaves with different pool sizes.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
>>
>>> Thanks Guowei, for the proposal.
>>>
>>> As discussed offline already, I think this sounds good.
>>>
>>> One thought is that 16m sounds very small for a default read buffer
>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>
>>> Best,
>>> Stephan
>>>
>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>>>
>>>> Hi, all
>>>>
>>>>
>>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>>> main reason is that the default configuration always makes users encounter
>>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>>> to avoid the problem.
>>>> Goals
>>>>
>>>>    1. Don't affect the streaming and pipelined-shuffle-only batch
>>>>    setups.
>>>>    2. Don't mix memory with different life cycle in the same pool.
>>>>    E.g., write buffers needed by running tasks and read buffer needed even
>>>>    after tasks being finished.
>>>>    3. User can use the TM merge shuffle with default memory
>>>>    configurations. (May need further tunings for performance optimization, but
>>>>    should not fail with the default configurations.)
>>>>
>>>> Proposal
>>>>
>>>>    1. Introduce a configuration
>>>>    `taskmanager.memory.network.batch-read` to specify the size of this memory
>>>>    pool. The default value is 16m.
>>>>    2. Allocate the pool lazily. It means that the memory pool would be
>>>>    allocated when the TM merge shuffle is used at the first time.
>>>>    3. This pool size will not be add up to the TM's total memory size,
>>>>    but will be considered part of
>>>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>>    pool size is not larger than the framework off-heap size, if TM merge
>>>>    shuffle is enabled.
>>>>
>>>>
>>>> In this default configuration, the allocation of the memory pool is
>>>> almost impossible to fail. Currently the default framework’s off-heap
>>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>>> data [2].
>>>> Known Limitation
>>>> Usability for increasing the memory pool size
>>>>
>>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>>> at the same time. It also means that once the user forgets this, it is
>>>> likely to fail the check when allocating the memory pool.
>>>>
>>>>
>>>> So in the following two situations, we will still prompt the user to
>>>> increase the size of `framework.off-heap.size`.
>>>>
>>>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>>>    `taskmanager.memory.framework.off-heap.size`
>>>>    2. Allocating the pool encounters the OOM.
>>>>
>>>>
>>>> An alternative is that when the user adjusts the size of the memory
>>>> pool, the system automatically adjusts it. But we are not entierly sure
>>>> about this, given its implicity and complicating the memory configurations.
>>>> Potential memory waste
>>>>
>>>> In the first step, the memory pool will not be released once allocated. This
>>>> means in the first step, even if there is no subsequent batch job, the
>>>> pooled memory cannot be used by other consumers.
>>>>
>>>>
>>>> We are not releasing the pool in the first step due to the concern that
>>>> frequently allocating/deallocating the entire pool may increase the GC
>>>> pressue. Investitations on how to dynamically release the pool when it's no
>>>> longer needed is considered a future follow-up.
>>>>
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>>
>>>> [2] https://github.com/apache/flink/pull/7368.
>>>> Best,
>>>> Guowei
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM merge shuffle

Guowei Ma
Hi,
I discussed with Xingtong and Yingjie offline and we agreed that the name
`taskmanager.memory.framework.off-heap.batch-shuffle.size` can better
reflect the current memory usage. So we decided to use the name Till
suggested.
Thank you all for your valuable feedback.
Best,
Guowei


On Mon, Mar 22, 2021 at 5:21 PM Stephan Ewen <[hidden email]> wrote:

> Hi Yingjie!
>
> Thanks for doing those experiments, the results look good. Let's go ahead
> with 32M then.
>
> Regarding the key, I am not strongly opinionated there. There are
> arguments for both keys, (1) making the key part of the network pool config
> as you did here or (2) making it part of the TM config (relative to
> framework off-heap memory). I find (1) quite understandable, but it is
> personal taste, so I can go with either option.
>
> Best,
> Stephan
>
>
> On Mon, Mar 22, 2021 at 9:15 AM 曹英杰(北牧) <[hidden email]> wrote:
>
>> Hi all,
>>
>> I have tested the default memory size with both batch (tpcds) and
>> streaming jobs running in one session cluster (more than 100 queries). The
>> result is:
>> 1. All settings (16M, 32M and 64M) can work well without any OOM.
>> 2. For streaming jobs running after batch jobs, there is no performance
>> or stability regression.
>> 2. 32M and 64M is better (over 10%) in terms of performance for the test
>> batch job on HDD.
>>
>> Based on the above results, I think 32M is a good default choice, because
>> the performance is good enough for the test job and compared to 64M, more
>> direct memory can be used by netty and other components. What do you think?
>>
>> BTW, about the configuration key, do we reach a consensus? I am
>> temporarily using taskmanager.memory.network.batch-shuffle-read.size in
>> my PR now. Any suggestions about that?
>>
>> Best,
>> Yingjie (Kevin)
>>
>> ------------------------------------------------------------------
>> 发件人:Guowei Ma<[hidden email]>
>> 日 期:2021年03月09日 17:28:35
>> 收件人:曹英杰(北牧)<[hidden email]>
>> 抄 送:Till Rohrmann<[hidden email]>; Stephan Ewen<[hidden email]>;
>> dev<[hidden email]>; user<[hidden email]>; Xintong Song<
>> [hidden email]>
>> 主 题:Re: Re: [DISCUSSION] Introduce a separated memory pool for the TM
>> merge shuffle
>>
>> Hi, all
>>
>> Thanks all for your suggestions and feedback.
>> I think it is a good idea that we increase the default size of the
>> separated pool by testing. I am fine with adding the suffix(".size") to the
>> config name, which makes it more clear to the user.
>> But I am a little worried about adding a prefix("framework") because
>> currently the tm shuffle service is only a shuffle-plugin, which is not a
>> part of the framework. So maybe we could add a clear explanation in the
>> document?
>>
>> Best,
>> Guowei
>>
>>
>> On Tue, Mar 9, 2021 at 3:58 PM 曹英杰(北牧) <[hidden email]> wrote:
>>
>>> Thanks for the suggestions. I will do some tests and share the results
>>> after the implementation is ready. Then we can give a proper default value.
>>>
>>> Best,
>>> Yingjie
>>>
>>> ------------------------------------------------------------------
>>> 发件人:Till Rohrmann<[hidden email]>
>>> 日 期:2021年03月05日 23:03:10
>>> 收件人:Stephan Ewen<[hidden email]>
>>> 抄 送:dev<[hidden email]>; user<[hidden email]>; Xintong
>>> Song<[hidden email]>; 曹英杰(北牧)<[hidden email]>; Guowei
>>> Ma<[hidden email]>
>>> 主 题:Re: [DISCUSSION] Introduce a separated memory pool for the TM merge
>>> shuffle
>>>
>>> Thanks for this proposal Guowei. +1 for it.
>>>
>>> Concerning the default size, maybe we can run some experiments and see
>>> how the system behaves with different pool sizes.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Mar 5, 2021 at 2:45 PM Stephan Ewen <[hidden email]> wrote:
>>>
>>>> Thanks Guowei, for the proposal.
>>>>
>>>> As discussed offline already, I think this sounds good.
>>>>
>>>> One thought is that 16m sounds very small for a default read buffer
>>>> pool. How risky do you think it is to increase this to 32m or 64m?
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>> On Fri, Mar 5, 2021 at 4:33 AM Guowei Ma <[hidden email]> wrote:
>>>>
>>>>> Hi, all
>>>>>
>>>>>
>>>>> In the Flink 1.12 we introduce the TM merge shuffle. But the
>>>>> out-of-the-box experience of using TM merge shuffle is not very good. The
>>>>> main reason is that the default configuration always makes users encounter
>>>>> OOM [1]. So we hope to introduce a managed memory pool for TM merge shuffle
>>>>> to avoid the problem.
>>>>> Goals
>>>>>
>>>>>    1. Don't affect the streaming and pipelined-shuffle-only batch
>>>>>    setups.
>>>>>    2. Don't mix memory with different life cycle in the same pool.
>>>>>    E.g., write buffers needed by running tasks and read buffer needed even
>>>>>    after tasks being finished.
>>>>>    3. User can use the TM merge shuffle with default memory
>>>>>    configurations. (May need further tunings for performance optimization, but
>>>>>    should not fail with the default configurations.)
>>>>>
>>>>> Proposal
>>>>>
>>>>>    1. Introduce a configuration
>>>>>    `taskmanager.memory.network.batch-read` to specify the size of this memory
>>>>>    pool. The default value is 16m.
>>>>>    2. Allocate the pool lazily. It means that the memory pool would
>>>>>    be allocated when the TM merge shuffle is used at the first time.
>>>>>    3. This pool size will not be add up to the TM's total memory
>>>>>    size, but will be considered part of
>>>>>    `taskmanager.memory.framework.off-heap.size`. We need to check that the
>>>>>    pool size is not larger than the framework off-heap size, if TM merge
>>>>>    shuffle is enabled.
>>>>>
>>>>>
>>>>> In this default configuration, the allocation of the memory pool is
>>>>> almost impossible to fail. Currently the default framework’s off-heap
>>>>> memory is 128m, which is mainly used by Netty. But after we introduced zero
>>>>> copy, the usage of it has been reduced, and you can refer to the detailed
>>>>> data [2].
>>>>> Known Limitation
>>>>> Usability for increasing the memory pool size
>>>>>
>>>>> In addition to increasing `taskmanager.memory.network.batch-read`, the
>>>>> user may also need to adjust `taskmanager.memory.framework.off-heap.size`
>>>>> at the same time. It also means that once the user forgets this, it is
>>>>> likely to fail the check when allocating the memory pool.
>>>>>
>>>>>
>>>>> So in the following two situations, we will still prompt the user to
>>>>> increase the size of `framework.off-heap.size`.
>>>>>
>>>>>    1. `taskmanager.memory.network.batch-read` is bigger than
>>>>>    `taskmanager.memory.framework.off-heap.size`
>>>>>    2. Allocating the pool encounters the OOM.
>>>>>
>>>>>
>>>>> An alternative is that when the user adjusts the size of the memory
>>>>> pool, the system automatically adjusts it. But we are not entierly sure
>>>>> about this, given its implicity and complicating the memory configurations.
>>>>> Potential memory waste
>>>>>
>>>>> In the first step, the memory pool will not be released once
>>>>> allocated. This means in the first step, even if there is no
>>>>> subsequent batch job, the pooled memory cannot be used by other consumers.
>>>>>
>>>>>
>>>>> We are not releasing the pool in the first step due to the concern
>>>>> that frequently allocating/deallocating the entire pool may increase the GC
>>>>> pressue. Investitations on how to dynamically release the pool when it's no
>>>>> longer needed is considered a future follow-up.
>>>>>
>>>>>
>>>>> Looking forward to your feedback.
>>>>>
>>>>>
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20740
>>>>>
>>>>> [2] https://github.com/apache/flink/pull/7368.
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>
>>>
>>