[DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
Hi, there,

We would like to start a discussion thread on "FLIP-169: DataStream
API for Fine-Grained Resource Requirements"[1], where we propose the
DataStream API for specifying fine-grained resource requirements in
StreamExecutionEnvironment.

Please find more details in the FLIP wiki document [1]. Looking
forward to your feedback.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements


Best,
Yangze Guo
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Xintong Song
Thanks Yangze for preparing the FLIP.

The proposed changes look good to me.

As you've mentioned in the implementation plan, I believe one of the most
important tasks of this FLIP is to have the feature well documented. It
would be really nice if we can keep that in mind and start drafting the
documentation early.

Thank you~

Xintong Song



On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]> wrote:

> Hi, there,
>
> We would like to start a discussion thread on "FLIP-169: DataStream
> API for Fine-Grained Resource Requirements"[1], where we propose the
> DataStream API for specifying fine-grained resource requirements in
> StreamExecutionEnvironment.
>
> Please find more details in the FLIP wiki document [1]. Looking
> forward to your feedback.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
>
>
> Best,
> Yangze Guo
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

wenlong.lwl
Thanks Yangze for the flip, it is great for users to be able to declare the
fine-grained resource requirements for the job.

I have one minor suggestion: can we support setting resource requirements
by configuration? Currently most of the config options in execution config
can be configured by configuration, and it is very likely that users need
to adjust the resource according to the performance of their job during
debugging,  Providing a configuration way will make it more convenient.

Bests,
Wenlong Lyu

On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]> wrote:

> Thanks Yangze for preparing the FLIP.
>
> The proposed changes look good to me.
>
> As you've mentioned in the implementation plan, I believe one of the most
> important tasks of this FLIP is to have the feature well documented. It
> would be really nice if we can keep that in mind and start drafting the
> documentation early.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]> wrote:
>
> > Hi, there,
> >
> > We would like to start a discussion thread on "FLIP-169: DataStream
> > API for Fine-Grained Resource Requirements"[1], where we propose the
> > DataStream API for specifying fine-grained resource requirements in
> > StreamExecutionEnvironment.
> >
> > Please find more details in the FLIP wiki document [1]. Looking
> > forward to your feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> >
> >
> > Best,
> > Yangze Guo
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
Thanks for the feedbacks, Xintong and Wenlong!

@Wenlong
I think that is a good idea, adjust the resource without re-compiling
the job will facilitate the tuning process.
We can define a pattern "slot-sharing-group.resource.{ssg name}"
(welcome any proposal for the prefix naming) for the resource spec
config of a slot sharing group. Then, user can set the ResourceSpec of
SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
heap: 100m, off-heap: 100m....}". WDYT?


Best,
Yangze Guo

On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]> wrote:

>
> Thanks Yangze for the flip, it is great for users to be able to declare the
> fine-grained resource requirements for the job.
>
> I have one minor suggestion: can we support setting resource requirements
> by configuration? Currently most of the config options in execution config
> can be configured by configuration, and it is very likely that users need
> to adjust the resource according to the performance of their job during
> debugging,  Providing a configuration way will make it more convenient.
>
> Bests,
> Wenlong Lyu
>
> On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]> wrote:
>
> > Thanks Yangze for preparing the FLIP.
> >
> > The proposed changes look good to me.
> >
> > As you've mentioned in the implementation plan, I believe one of the most
> > important tasks of this FLIP is to have the feature well documented. It
> > would be really nice if we can keep that in mind and start drafting the
> > documentation early.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]> wrote:
> >
> > > Hi, there,
> > >
> > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > API for Fine-Grained Resource Requirements"[1], where we propose the
> > > DataStream API for specifying fine-grained resource requirements in
> > > StreamExecutionEnvironment.
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking
> > > forward to your feedback.
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
@Wenlong
After another consideration, the config option approach I mentioned
above might not be appropriate. The resource requirements for SSG
should be a job level configuration and should no be set in the
flink-conf.

I think we can define a JSON format, which would be the ResourceSpecs
mapped by the name of SSGs, for the resource requirements of a
specific job. Then, we allow user to configure the file path of that
JSON. The JSON will be only parsed in runtime, which allows user to
tune it without re-compiling the job.

We can add another #setSlotSharingGroupResources for configuring the
file path of that JSON:
```
/**
 * Specify fine-grained resource requirements for slot sharing groups
with the given resource JSON file. The existing resource
 * requirement of the same slot sharing group will be replaced.
 */
public StreamExecutionEnvironment setSlotSharingGroupResources(
        String pathToResourceJson);
```

WDYT?

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]> wrote:

>
> Thanks for the feedbacks, Xintong and Wenlong!
>
> @Wenlong
> I think that is a good idea, adjust the resource without re-compiling
> the job will facilitate the tuning process.
> We can define a pattern "slot-sharing-group.resource.{ssg name}"
> (welcome any proposal for the prefix naming) for the resource spec
> config of a slot sharing group. Then, user can set the ResourceSpec of
> SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> heap: 100m, off-heap: 100m....}". WDYT?
>
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]> wrote:
> >
> > Thanks Yangze for the flip, it is great for users to be able to declare the
> > fine-grained resource requirements for the job.
> >
> > I have one minor suggestion: can we support setting resource requirements
> > by configuration? Currently most of the config options in execution config
> > can be configured by configuration, and it is very likely that users need
> > to adjust the resource according to the performance of their job during
> > debugging,  Providing a configuration way will make it more convenient.
> >
> > Bests,
> > Wenlong Lyu
> >
> > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]> wrote:
> >
> > > Thanks Yangze for preparing the FLIP.
> > >
> > > The proposed changes look good to me.
> > >
> > > As you've mentioned in the implementation plan, I believe one of the most
> > > important tasks of this FLIP is to have the feature well documented. It
> > > would be really nice if we can keep that in mind and start drafting the
> > > documentation early.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]> wrote:
> > >
> > > > Hi, there,
> > > >
> > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > API for Fine-Grained Resource Requirements"[1], where we propose the
> > > > DataStream API for specifying fine-grained resource requirements in
> > > > StreamExecutionEnvironment.
> > > >
> > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > forward to your feedback.
> > > >
> > > > [1]
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Xintong Song
I think being able to specify fine grained resource requirements without
having to change the codes and recompile the job is indeed a good idea. It
definitely improves the usability.

However, this requires more careful designs, which probably deserves a
separate thread. I'd be good to have that discussion, but maybe not block
this feature on that.

One idea concerning the configuration approach: As Yangze said, flink
configuration options are supposed to take effect at cluster level. For
updating job level specifics that are not suitable to be introduced as a
config option, currently the only way is to pass them as program arguments.
Would it make sense to introduce a general approach for overwriting such
job specifics without re-compiling the job?

Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:

> @Wenlong
> After another consideration, the config option approach I mentioned
> above might not be appropriate. The resource requirements for SSG
> should be a job level configuration and should no be set in the
> flink-conf.
>
> I think we can define a JSON format, which would be the ResourceSpecs
> mapped by the name of SSGs, for the resource requirements of a
> specific job. Then, we allow user to configure the file path of that
> JSON. The JSON will be only parsed in runtime, which allows user to
> tune it without re-compiling the job.
>
> We can add another #setSlotSharingGroupResources for configuring the
> file path of that JSON:
> ```
> /**
>  * Specify fine-grained resource requirements for slot sharing groups
> with the given resource JSON file. The existing resource
>  * requirement of the same slot sharing group will be replaced.
>  */
> public StreamExecutionEnvironment setSlotSharingGroupResources(
>         String pathToResourceJson);
> ```
>
> WDYT?
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]> wrote:
> >
> > Thanks for the feedbacks, Xintong and Wenlong!
> >
> > @Wenlong
> > I think that is a good idea, adjust the resource without re-compiling
> > the job will facilitate the tuning process.
> > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > (welcome any proposal for the prefix naming) for the resource spec
> > config of a slot sharing group. Then, user can set the ResourceSpec of
> > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > heap: 100m, off-heap: 100m....}". WDYT?
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]>
> wrote:
> > >
> > > Thanks Yangze for the flip, it is great for users to be able to
> declare the
> > > fine-grained resource requirements for the job.
> > >
> > > I have one minor suggestion: can we support setting resource
> requirements
> > > by configuration? Currently most of the config options in execution
> config
> > > can be configured by configuration, and it is very likely that users
> need
> > > to adjust the resource according to the performance of their job during
> > > debugging,  Providing a configuration way will make it more convenient.
> > >
> > > Bests,
> > > Wenlong Lyu
> > >
> > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]>
> wrote:
> > >
> > > > Thanks Yangze for preparing the FLIP.
> > > >
> > > > The proposed changes look good to me.
> > > >
> > > > As you've mentioned in the implementation plan, I believe one of the
> most
> > > > important tasks of this FLIP is to have the feature well documented.
> It
> > > > would be really nice if we can keep that in mind and start drafting
> the
> > > > documentation early.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> wrote:
> > > >
> > > > > Hi, there,
> > > > >
> > > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> the
> > > > > DataStream API for specifying fine-grained resource requirements in
> > > > > StreamExecutionEnvironment.
> > > > >
> > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > > forward to your feedback.
> > > > >
> > > > > [1]
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
@Xintong
> introduce a general approach for overwriting such job specifics without re-compiling the job
I think that would be a good direction. Just share some cents on this
topic. I'd divide the job-level specifics into two categories:
- Specifics which affect how Flink executes the job, e.g.
"parallelism.default". Currently, most of these specifics have a
corresponding config option.
- Job-specific arguments, e.g. the "input" of our WordCount example.
Those could only be passes as program arguments.
It might be good to have a general approach for overwriting all the
above arguments. One preliminary idea is introducing a separate
"job-conf.yaml".

All in all, I agree that this topic requires more careful designs and
deserved a separate discussion thread.

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 1:47 PM Xintong Song <[hidden email]> wrote:

>
> I think being able to specify fine grained resource requirements without
> having to change the codes and recompile the job is indeed a good idea. It
> definitely improves the usability.
>
> However, this requires more careful designs, which probably deserves a
> separate thread. I'd be good to have that discussion, but maybe not block
> this feature on that.
>
> One idea concerning the configuration approach: As Yangze said, flink
> configuration options are supposed to take effect at cluster level. For
> updating job level specifics that are not suitable to be introduced as a
> config option, currently the only way is to pass them as program arguments.
> Would it make sense to introduce a general approach for overwriting such
> job specifics without re-compiling the job?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
>
> > @Wenlong
> > After another consideration, the config option approach I mentioned
> > above might not be appropriate. The resource requirements for SSG
> > should be a job level configuration and should no be set in the
> > flink-conf.
> >
> > I think we can define a JSON format, which would be the ResourceSpecs
> > mapped by the name of SSGs, for the resource requirements of a
> > specific job. Then, we allow user to configure the file path of that
> > JSON. The JSON will be only parsed in runtime, which allows user to
> > tune it without re-compiling the job.
> >
> > We can add another #setSlotSharingGroupResources for configuring the
> > file path of that JSON:
> > ```
> > /**
> >  * Specify fine-grained resource requirements for slot sharing groups
> > with the given resource JSON file. The existing resource
> >  * requirement of the same slot sharing group will be replaced.
> >  */
> > public StreamExecutionEnvironment setSlotSharingGroupResources(
> >         String pathToResourceJson);
> > ```
> >
> > WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]> wrote:
> > >
> > > Thanks for the feedbacks, Xintong and Wenlong!
> > >
> > > @Wenlong
> > > I think that is a good idea, adjust the resource without re-compiling
> > > the job will facilitate the tuning process.
> > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > (welcome any proposal for the prefix naming) for the resource spec
> > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > heap: 100m, off-heap: 100m....}". WDYT?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]>
> > wrote:
> > > >
> > > > Thanks Yangze for the flip, it is great for users to be able to
> > declare the
> > > > fine-grained resource requirements for the job.
> > > >
> > > > I have one minor suggestion: can we support setting resource
> > requirements
> > > > by configuration? Currently most of the config options in execution
> > config
> > > > can be configured by configuration, and it is very likely that users
> > need
> > > > to adjust the resource according to the performance of their job during
> > > > debugging,  Providing a configuration way will make it more convenient.
> > > >
> > > > Bests,
> > > > Wenlong Lyu
> > > >
> > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]>
> > wrote:
> > > >
> > > > > Thanks Yangze for preparing the FLIP.
> > > > >
> > > > > The proposed changes look good to me.
> > > > >
> > > > > As you've mentioned in the implementation plan, I believe one of the
> > most
> > > > > important tasks of this FLIP is to have the feature well documented.
> > It
> > > > > would be really nice if we can keep that in mind and start drafting
> > the
> > > > > documentation early.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi, there,
> > > > > >
> > > > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> > the
> > > > > > DataStream API for specifying fine-grained resource requirements in
> > > > > > StreamExecutionEnvironment.
> > > > > >
> > > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > > > forward to your feedback.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yang Wang
In reply to this post by Xintong Song
Thanks @Yangze for preparing this FLIP.

I think this is a good start point for the community users to have a taste
on the fine-grained
resource management, which we all believe it could improve the Flink job
stability and
cluster utilization.

I have a simple question about the extended resources. It is possible to
combine extended resources
with fine-grained resource management. Except for the GPU, FPGA and other
new computing devices,
maybe the disk resource is a more general use case. For example, different
SSG may have various
disk requirements based on the state. So we need to allocate enough
ephemeral storage resource for every
TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
due to running out of limits.


Best,
Yang


Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:

> I think being able to specify fine grained resource requirements without
> having to change the codes and recompile the job is indeed a good idea. It
> definitely improves the usability.
>
> However, this requires more careful designs, which probably deserves a
> separate thread. I'd be good to have that discussion, but maybe not block
> this feature on that.
>
> One idea concerning the configuration approach: As Yangze said, flink
> configuration options are supposed to take effect at cluster level. For
> updating job level specifics that are not suitable to be introduced as a
> config option, currently the only way is to pass them as program arguments.
> Would it make sense to introduce a general approach for overwriting such
> job specifics without re-compiling the job?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
>
> > @Wenlong
> > After another consideration, the config option approach I mentioned
> > above might not be appropriate. The resource requirements for SSG
> > should be a job level configuration and should no be set in the
> > flink-conf.
> >
> > I think we can define a JSON format, which would be the ResourceSpecs
> > mapped by the name of SSGs, for the resource requirements of a
> > specific job. Then, we allow user to configure the file path of that
> > JSON. The JSON will be only parsed in runtime, which allows user to
> > tune it without re-compiling the job.
> >
> > We can add another #setSlotSharingGroupResources for configuring the
> > file path of that JSON:
> > ```
> > /**
> >  * Specify fine-grained resource requirements for slot sharing groups
> > with the given resource JSON file. The existing resource
> >  * requirement of the same slot sharing group will be replaced.
> >  */
> > public StreamExecutionEnvironment setSlotSharingGroupResources(
> >         String pathToResourceJson);
> > ```
> >
> > WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]> wrote:
> > >
> > > Thanks for the feedbacks, Xintong and Wenlong!
> > >
> > > @Wenlong
> > > I think that is a good idea, adjust the resource without re-compiling
> > > the job will facilitate the tuning process.
> > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > (welcome any proposal for the prefix naming) for the resource spec
> > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > heap: 100m, off-heap: 100m....}". WDYT?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]>
> > wrote:
> > > >
> > > > Thanks Yangze for the flip, it is great for users to be able to
> > declare the
> > > > fine-grained resource requirements for the job.
> > > >
> > > > I have one minor suggestion: can we support setting resource
> > requirements
> > > > by configuration? Currently most of the config options in execution
> > config
> > > > can be configured by configuration, and it is very likely that users
> > need
> > > > to adjust the resource according to the performance of their job
> during
> > > > debugging,  Providing a configuration way will make it more
> convenient.
> > > >
> > > > Bests,
> > > > Wenlong Lyu
> > > >
> > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]>
> > wrote:
> > > >
> > > > > Thanks Yangze for preparing the FLIP.
> > > > >
> > > > > The proposed changes look good to me.
> > > > >
> > > > > As you've mentioned in the implementation plan, I believe one of
> the
> > most
> > > > > important tasks of this FLIP is to have the feature well
> documented.
> > It
> > > > > would be really nice if we can keep that in mind and start drafting
> > the
> > > > > documentation early.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > wrote:
> > > > >
> > > > > > Hi, there,
> > > > > >
> > > > > > We would like to start a discussion thread on "FLIP-169:
> DataStream
> > > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> > the
> > > > > > DataStream API for specifying fine-grained resource requirements
> in
> > > > > > StreamExecutionEnvironment.
> > > > > >
> > > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > > > forward to your feedback.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
@Yang
In short, the external resources will participate in resource
deduction and be logically ensured, but requesting an external
resource must still be done through config options with the current
default resource allocation strategy.
In FLIP-56, we abstract the logic of resource allocation to the
`ResourceAllocationStrategy`. Currently, with its default
implementation, ResourceManager would still allocate TMs with the same
resource spec and the external resources of it are configured through
the config option as well. So, in your case, you need to define the
"external-resources" and "external-resources.disk.amount". Then, all
the disk requirements defined in the SSG will be logically ensured, as
there is no slot level isolation. If the disk space of a task manager
cannot fulfill the disk requirement, RM will allocate a new one.
In the future, we'd like to introduce a `ResourceAllocationStrategy`
which allocates heterogeneous TMs according to the requirements. Then,
user only needs to define the driver of external resources when
needed.
Also, regarding the resource isolation, we may provide a fine-grained
mode in which each slot can only fetch the information of external
resources it requires in the future. But that is out of the scope of
this PR.

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <[hidden email]> wrote:

>
> Thanks @Yangze for preparing this FLIP.
>
> I think this is a good start point for the community users to have a taste
> on the fine-grained
> resource management, which we all believe it could improve the Flink job
> stability and
> cluster utilization.
>
> I have a simple question about the extended resources. It is possible to
> combine extended resources
> with fine-grained resource management. Except for the GPU, FPGA and other
> new computing devices,
> maybe the disk resource is a more general use case. For example, different
> SSG may have various
> disk requirements based on the state. So we need to allocate enough
> ephemeral storage resource for every
> TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> due to running out of limits.
>
>
> Best,
> Yang
>
>
> Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:
>
> > I think being able to specify fine grained resource requirements without
> > having to change the codes and recompile the job is indeed a good idea. It
> > definitely improves the usability.
> >
> > However, this requires more careful designs, which probably deserves a
> > separate thread. I'd be good to have that discussion, but maybe not block
> > this feature on that.
> >
> > One idea concerning the configuration approach: As Yangze said, flink
> > configuration options are supposed to take effect at cluster level. For
> > updating job level specifics that are not suitable to be introduced as a
> > config option, currently the only way is to pass them as program arguments.
> > Would it make sense to introduce a general approach for overwriting such
> > job specifics without re-compiling the job?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
> >
> > > @Wenlong
> > > After another consideration, the config option approach I mentioned
> > > above might not be appropriate. The resource requirements for SSG
> > > should be a job level configuration and should no be set in the
> > > flink-conf.
> > >
> > > I think we can define a JSON format, which would be the ResourceSpecs
> > > mapped by the name of SSGs, for the resource requirements of a
> > > specific job. Then, we allow user to configure the file path of that
> > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > tune it without re-compiling the job.
> > >
> > > We can add another #setSlotSharingGroupResources for configuring the
> > > file path of that JSON:
> > > ```
> > > /**
> > >  * Specify fine-grained resource requirements for slot sharing groups
> > > with the given resource JSON file. The existing resource
> > >  * requirement of the same slot sharing group will be replaced.
> > >  */
> > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > >         String pathToResourceJson);
> > > ```
> > >
> > > WDYT?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]> wrote:
> > > >
> > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > >
> > > > @Wenlong
> > > > I think that is a good idea, adjust the resource without re-compiling
> > > > the job will facilitate the tuning process.
> > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > config of a slot sharing group. Then, user can set the ResourceSpec of
> > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <[hidden email]>
> > > wrote:
> > > > >
> > > > > Thanks Yangze for the flip, it is great for users to be able to
> > > declare the
> > > > > fine-grained resource requirements for the job.
> > > > >
> > > > > I have one minor suggestion: can we support setting resource
> > > requirements
> > > > > by configuration? Currently most of the config options in execution
> > > config
> > > > > can be configured by configuration, and it is very likely that users
> > > need
> > > > > to adjust the resource according to the performance of their job
> > during
> > > > > debugging,  Providing a configuration way will make it more
> > convenient.
> > > > >
> > > > > Bests,
> > > > > Wenlong Lyu
> > > > >
> > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]>
> > > wrote:
> > > > >
> > > > > > Thanks Yangze for preparing the FLIP.
> > > > > >
> > > > > > The proposed changes look good to me.
> > > > > >
> > > > > > As you've mentioned in the implementation plan, I believe one of
> > the
> > > most
> > > > > > important tasks of this FLIP is to have the feature well
> > documented.
> > > It
> > > > > > would be really nice if we can keep that in mind and start drafting
> > > the
> > > > > > documentation early.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > > Hi, there,
> > > > > > >
> > > > > > > We would like to start a discussion thread on "FLIP-169:
> > DataStream
> > > > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> > > the
> > > > > > > DataStream API for specifying fine-grained resource requirements
> > in
> > > > > > > StreamExecutionEnvironment.
> > > > > > >
> > > > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > > > > forward to your feedback.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > >
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Arvid Heise-4
Hi Yangze,

I like the general approach to bind requirements to slotsharing groups. I
think the current approach is also flexible enough that a user could simply
use ParameterTool or similar to use config values and wire that with their
slotgroups, such that different requirements can be tested without
recompilation. So I don't see an immediate need to provide a generic
solution for yaml configuration for now.

Looking at the programmatic interface though, I think we could improve by
quite a bit and I haven't seen these alternatives being considered in the
FLIP:
1) Add new class SlotSharingGroup that incorporates all ResourceSpec
properties. Instead of using group names, the user could directly configure
such an object.

        SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
could also be omitted and auto-generated
        ssg1.setCPUCores(4);
        ...
        DataStream<Tuple2<String, Integer>> grades =
                GradeSource
                        .getSource(env, rate)

.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                        .slotSharingGroup(ssg1);
        DataStream<Tuple2<String, Integer>> salaries =
                SalarySource.getSource(env, rate)

.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
                        .slotSharingGroup(ssg2);

        // run the actual window join program with the same slot sharing
group as grades
        DataStream<Tuple3<String, Integer, Integer>> joinedStream =
                runWindowJoin(grades, salaries,
windowSize).slotSharingGroup(ssg1);

Note that we could make it backward compatible by changing the proposed
StreamExecutionEnvironment#setSlotSharingGroupResource to
StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
then use the string name for further reference.

2) I'm also not sure on the StreamExecutionEnvironment#
setSlotSharingGroupResources. What's the benefit of the Map version over
having the simple setter? Even if the user has a map
slotSharingGroupResources, he could simply do
slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);

3) Is defining the ExternalResource part of this FLIP? I don't see a
Public* class yet. I'd be also fine to cut the scope of this FLIP and
remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.

4) We should probably use a builder pattern around
ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
we need to fully specify that in the FLIP but it would be good to at least
say how they should be created by the user.



On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <[hidden email]> wrote:

> @Yang
> In short, the external resources will participate in resource
> deduction and be logically ensured, but requesting an external
> resource must still be done through config options with the current
> default resource allocation strategy.
> In FLIP-56, we abstract the logic of resource allocation to the
> `ResourceAllocationStrategy`. Currently, with its default
> implementation, ResourceManager would still allocate TMs with the same
> resource spec and the external resources of it are configured through
> the config option as well. So, in your case, you need to define the
> "external-resources" and "external-resources.disk.amount". Then, all
> the disk requirements defined in the SSG will be logically ensured, as
> there is no slot level isolation. If the disk space of a task manager
> cannot fulfill the disk requirement, RM will allocate a new one.
> In the future, we'd like to introduce a `ResourceAllocationStrategy`
> which allocates heterogeneous TMs according to the requirements. Then,
> user only needs to define the driver of external resources when
> needed.
> Also, regarding the resource isolation, we may provide a fine-grained
> mode in which each slot can only fetch the information of external
> resources it requires in the future. But that is out of the scope of
> this PR.
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <[hidden email]> wrote:
> >
> > Thanks @Yangze for preparing this FLIP.
> >
> > I think this is a good start point for the community users to have a
> taste
> > on the fine-grained
> > resource management, which we all believe it could improve the Flink job
> > stability and
> > cluster utilization.
> >
> > I have a simple question about the extended resources. It is possible to
> > combine extended resources
> > with fine-grained resource management. Except for the GPU, FPGA and other
> > new computing devices,
> > maybe the disk resource is a more general use case. For example,
> different
> > SSG may have various
> > disk requirements based on the state. So we need to allocate enough
> > ephemeral storage resource for every
> > TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> > due to running out of limits.
> >
> >
> > Best,
> > Yang
> >
> >
> > Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:
> >
> > > I think being able to specify fine grained resource requirements
> without
> > > having to change the codes and recompile the job is indeed a good
> idea. It
> > > definitely improves the usability.
> > >
> > > However, this requires more careful designs, which probably deserves a
> > > separate thread. I'd be good to have that discussion, but maybe not
> block
> > > this feature on that.
> > >
> > > One idea concerning the configuration approach: As Yangze said, flink
> > > configuration options are supposed to take effect at cluster level. For
> > > updating job level specifics that are not suitable to be introduced as
> a
> > > config option, currently the only way is to pass them as program
> arguments.
> > > Would it make sense to introduce a general approach for overwriting
> such
> > > job specifics without re-compiling the job?
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
> > >
> > > > @Wenlong
> > > > After another consideration, the config option approach I mentioned
> > > > above might not be appropriate. The resource requirements for SSG
> > > > should be a job level configuration and should no be set in the
> > > > flink-conf.
> > > >
> > > > I think we can define a JSON format, which would be the ResourceSpecs
> > > > mapped by the name of SSGs, for the resource requirements of a
> > > > specific job. Then, we allow user to configure the file path of that
> > > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > > tune it without re-compiling the job.
> > > >
> > > > We can add another #setSlotSharingGroupResources for configuring the
> > > > file path of that JSON:
> > > > ```
> > > > /**
> > > >  * Specify fine-grained resource requirements for slot sharing groups
> > > > with the given resource JSON file. The existing resource
> > > >  * requirement of the same slot sharing group will be replaced.
> > > >  */
> > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > >         String pathToResourceJson);
> > > > ```
> > > >
> > > > WDYT?
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]>
> wrote:
> > > > >
> > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > >
> > > > > @Wenlong
> > > > > I think that is a good idea, adjust the resource without
> re-compiling
> > > > > the job will facilitate the tuning process.
> > > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > > config of a slot sharing group. Then, user can set the
> ResourceSpec of
> > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> [hidden email]>
> > > > wrote:
> > > > > >
> > > > > > Thanks Yangze for the flip, it is great for users to be able to
> > > > declare the
> > > > > > fine-grained resource requirements for the job.
> > > > > >
> > > > > > I have one minor suggestion: can we support setting resource
> > > > requirements
> > > > > > by configuration? Currently most of the config options in
> execution
> > > > config
> > > > > > can be configured by configuration, and it is very likely that
> users
> > > > need
> > > > > > to adjust the resource according to the performance of their job
> > > during
> > > > > > debugging,  Providing a configuration way will make it more
> > > convenient.
> > > > > >
> > > > > > Bests,
> > > > > > Wenlong Lyu
> > > > > >
> > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > > >
> > > > > > > The proposed changes look good to me.
> > > > > > >
> > > > > > > As you've mentioned in the implementation plan, I believe one
> of
> > > the
> > > > most
> > > > > > > important tasks of this FLIP is to have the feature well
> > > documented.
> > > > It
> > > > > > > would be really nice if we can keep that in mind and start
> drafting
> > > > the
> > > > > > > documentation early.
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi, there,
> > > > > > > >
> > > > > > > > We would like to start a discussion thread on "FLIP-169:
> > > DataStream
> > > > > > > > API for Fine-Grained Resource Requirements"[1], where we
> propose
> > > > the
> > > > > > > > DataStream API for specifying fine-grained resource
> requirements
> > > in
> > > > > > > > StreamExecutionEnvironment.
> > > > > > > >
> > > > > > > > Please find more details in the FLIP wiki document [1].
> Looking
> > > > > > > > forward to your feedback.
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > >
> > > >
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
Thanks for the valuable suggestion, Arvid.

1) Yes, we can add a new SlotSharingGroup which includes the name and
its resource. After that, we have two interfaces for configuring the
slot sharing group of an operator:
- #slotSharingGroup(String name)    // the resource of it can be
configured through StreamExecutionEnvironment#registerSlotSharingGroup
- #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the resource
And one interface to configure the resource of a SSG:
- StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
We can also define the priority of the above two approaches, e.g. the
resource registering in the StreamExecutionEnvironment will always be
respected when conflict. That would be well documented.

2) Yes, I originally add this interface as a shortcut. It seems
unnecessary now. Will remove it.

3) I don't think we need to expose the ExternalResource. In the
builder of SlotSharingGroup, we can introduce a
#withExternalResource(String name, double value). Also, this interface
needs to be annotated as evolving.

4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
elaborate on the Builder for the SlotSharingGroup.

WDYT?

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise <[hidden email]> wrote:

>
> Hi Yangze,
>
> I like the general approach to bind requirements to slotsharing groups. I
> think the current approach is also flexible enough that a user could simply
> use ParameterTool or similar to use config values and wire that with their
> slotgroups, such that different requirements can be tested without
> recompilation. So I don't see an immediate need to provide a generic
> solution for yaml configuration for now.
>
> Looking at the programmatic interface though, I think we could improve by
> quite a bit and I haven't seen these alternatives being considered in the
> FLIP:
> 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> properties. Instead of using group names, the user could directly configure
> such an object.
>
>         SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> could also be omitted and auto-generated
>         ssg1.setCPUCores(4);
>         ...
>         DataStream<Tuple2<String, Integer>> grades =
>                 GradeSource
>                         .getSource(env, rate)
>
> .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>                         .slotSharingGroup(ssg1);
>         DataStream<Tuple2<String, Integer>> salaries =
>                 SalarySource.getSource(env, rate)
>
> .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>                         .slotSharingGroup(ssg2);
>
>         // run the actual window join program with the same slot sharing
> group as grades
>         DataStream<Tuple3<String, Integer, Integer>> joinedStream =
>                 runWindowJoin(grades, salaries,
> windowSize).slotSharingGroup(ssg1);
>
> Note that we could make it backward compatible by changing the proposed
> StreamExecutionEnvironment#setSlotSharingGroupResource to
> StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
> then use the string name for further reference.
>
> 2) I'm also not sure on the StreamExecutionEnvironment#
> setSlotSharingGroupResources. What's the benefit of the Map version over
> having the simple setter? Even if the user has a map
> slotSharingGroupResources, he could simply do
> slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
>
> 3) Is defining the ExternalResource part of this FLIP? I don't see a
> Public* class yet. I'd be also fine to cut the scope of this FLIP and
> remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
>
> 4) We should probably use a builder pattern around
> ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
> we need to fully specify that in the FLIP but it would be good to at least
> say how they should be created by the user.
>
>
>
> On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <[hidden email]> wrote:
>
> > @Yang
> > In short, the external resources will participate in resource
> > deduction and be logically ensured, but requesting an external
> > resource must still be done through config options with the current
> > default resource allocation strategy.
> > In FLIP-56, we abstract the logic of resource allocation to the
> > `ResourceAllocationStrategy`. Currently, with its default
> > implementation, ResourceManager would still allocate TMs with the same
> > resource spec and the external resources of it are configured through
> > the config option as well. So, in your case, you need to define the
> > "external-resources" and "external-resources.disk.amount". Then, all
> > the disk requirements defined in the SSG will be logically ensured, as
> > there is no slot level isolation. If the disk space of a task manager
> > cannot fulfill the disk requirement, RM will allocate a new one.
> > In the future, we'd like to introduce a `ResourceAllocationStrategy`
> > which allocates heterogeneous TMs according to the requirements. Then,
> > user only needs to define the driver of external resources when
> > needed.
> > Also, regarding the resource isolation, we may provide a fine-grained
> > mode in which each slot can only fetch the information of external
> > resources it requires in the future. But that is out of the scope of
> > this PR.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <[hidden email]> wrote:
> > >
> > > Thanks @Yangze for preparing this FLIP.
> > >
> > > I think this is a good start point for the community users to have a
> > taste
> > > on the fine-grained
> > > resource management, which we all believe it could improve the Flink job
> > > stability and
> > > cluster utilization.
> > >
> > > I have a simple question about the extended resources. It is possible to
> > > combine extended resources
> > > with fine-grained resource management. Except for the GPU, FPGA and other
> > > new computing devices,
> > > maybe the disk resource is a more general use case. For example,
> > different
> > > SSG may have various
> > > disk requirements based on the state. So we need to allocate enough
> > > ephemeral storage resource for every
> > > TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> > > due to running out of limits.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > >
> > > Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:
> > >
> > > > I think being able to specify fine grained resource requirements
> > without
> > > > having to change the codes and recompile the job is indeed a good
> > idea. It
> > > > definitely improves the usability.
> > > >
> > > > However, this requires more careful designs, which probably deserves a
> > > > separate thread. I'd be good to have that discussion, but maybe not
> > block
> > > > this feature on that.
> > > >
> > > > One idea concerning the configuration approach: As Yangze said, flink
> > > > configuration options are supposed to take effect at cluster level. For
> > > > updating job level specifics that are not suitable to be introduced as
> > a
> > > > config option, currently the only way is to pass them as program
> > arguments.
> > > > Would it make sense to introduce a general approach for overwriting
> > such
> > > > job specifics without re-compiling the job?
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
> > > >
> > > > > @Wenlong
> > > > > After another consideration, the config option approach I mentioned
> > > > > above might not be appropriate. The resource requirements for SSG
> > > > > should be a job level configuration and should no be set in the
> > > > > flink-conf.
> > > > >
> > > > > I think we can define a JSON format, which would be the ResourceSpecs
> > > > > mapped by the name of SSGs, for the resource requirements of a
> > > > > specific job. Then, we allow user to configure the file path of that
> > > > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > > > tune it without re-compiling the job.
> > > > >
> > > > > We can add another #setSlotSharingGroupResources for configuring the
> > > > > file path of that JSON:
> > > > > ```
> > > > > /**
> > > > >  * Specify fine-grained resource requirements for slot sharing groups
> > > > > with the given resource JSON file. The existing resource
> > > > >  * requirement of the same slot sharing group will be replaced.
> > > > >  */
> > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > > >         String pathToResourceJson);
> > > > > ```
> > > > >
> > > > > WDYT?
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]>
> > wrote:
> > > > > >
> > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > > >
> > > > > > @Wenlong
> > > > > > I think that is a good idea, adjust the resource without
> > re-compiling
> > > > > > the job will facilitate the tuning process.
> > > > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > > > config of a slot sharing group. Then, user can set the
> > ResourceSpec of
> > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > [hidden email]>
> > > > > wrote:
> > > > > > >
> > > > > > > Thanks Yangze for the flip, it is great for users to be able to
> > > > > declare the
> > > > > > > fine-grained resource requirements for the job.
> > > > > > >
> > > > > > > I have one minor suggestion: can we support setting resource
> > > > > requirements
> > > > > > > by configuration? Currently most of the config options in
> > execution
> > > > > config
> > > > > > > can be configured by configuration, and it is very likely that
> > users
> > > > > need
> > > > > > > to adjust the resource according to the performance of their job
> > > > during
> > > > > > > debugging,  Providing a configuration way will make it more
> > > > convenient.
> > > > > > >
> > > > > > > Bests,
> > > > > > > Wenlong Lyu
> > > > > > >
> > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > > > >
> > > > > > > > The proposed changes look good to me.
> > > > > > > >
> > > > > > > > As you've mentioned in the implementation plan, I believe one
> > of
> > > > the
> > > > > most
> > > > > > > > important tasks of this FLIP is to have the feature well
> > > > documented.
> > > > > It
> > > > > > > > would be really nice if we can keep that in mind and start
> > drafting
> > > > > the
> > > > > > > > documentation early.
> > > > > > > >
> > > > > > > > Thank you~
> > > > > > > >
> > > > > > > > Xintong Song
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, there,
> > > > > > > > >
> > > > > > > > > We would like to start a discussion thread on "FLIP-169:
> > > > DataStream
> > > > > > > > > API for Fine-Grained Resource Requirements"[1], where we
> > propose
> > > > > the
> > > > > > > > > DataStream API for specifying fine-grained resource
> > requirements
> > > > in
> > > > > > > > > StreamExecutionEnvironment.
> > > > > > > > >
> > > > > > > > > Please find more details in the FLIP wiki document [1].
> > Looking
> > > > > > > > > forward to your feedback.
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yangze Guo
> > > > > > > > >
> > > > > > > >
> > > > >
> > > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
Thanks all for the discussion. I've updated the FLIP accordingly, the
key changes are:
- Introduce SlotSharingGroup instead of ResourceSpec which contains
the resource spec of slot sharing group
- Introduce two interfaces for specifying the SlotSharingGroup:
#slotSharingGroup(SlotSharingGroup) and
StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).

If there is no more feedback, I'd start a vote next week.

Best,
Yangze Guo

On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo <[hidden email]> wrote:

>
> Thanks for the valuable suggestion, Arvid.
>
> 1) Yes, we can add a new SlotSharingGroup which includes the name and
> its resource. After that, we have two interfaces for configuring the
> slot sharing group of an operator:
> - #slotSharingGroup(String name)    // the resource of it can be
> configured through StreamExecutionEnvironment#registerSlotSharingGroup
> - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the resource
> And one interface to configure the resource of a SSG:
> - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> We can also define the priority of the above two approaches, e.g. the
> resource registering in the StreamExecutionEnvironment will always be
> respected when conflict. That would be well documented.
>
> 2) Yes, I originally add this interface as a shortcut. It seems
> unnecessary now. Will remove it.
>
> 3) I don't think we need to expose the ExternalResource. In the
> builder of SlotSharingGroup, we can introduce a
> #withExternalResource(String name, double value). Also, this interface
> needs to be annotated as evolving.
>
> 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
> elaborate on the Builder for the SlotSharingGroup.
>
> WDYT?
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise <[hidden email]> wrote:
> >
> > Hi Yangze,
> >
> > I like the general approach to bind requirements to slotsharing groups. I
> > think the current approach is also flexible enough that a user could simply
> > use ParameterTool or similar to use config values and wire that with their
> > slotgroups, such that different requirements can be tested without
> > recompilation. So I don't see an immediate need to provide a generic
> > solution for yaml configuration for now.
> >
> > Looking at the programmatic interface though, I think we could improve by
> > quite a bit and I haven't seen these alternatives being considered in the
> > FLIP:
> > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> > properties. Instead of using group names, the user could directly configure
> > such an object.
> >
> >         SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> > could also be omitted and auto-generated
> >         ssg1.setCPUCores(4);
> >         ...
> >         DataStream<Tuple2<String, Integer>> grades =
> >                 GradeSource
> >                         .getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> >                         .slotSharingGroup(ssg1);
> >         DataStream<Tuple2<String, Integer>> salaries =
> >                 SalarySource.getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> >                         .slotSharingGroup(ssg2);
> >
> >         // run the actual window join program with the same slot sharing
> > group as grades
> >         DataStream<Tuple3<String, Integer, Integer>> joinedStream =
> >                 runWindowJoin(grades, salaries,
> > windowSize).slotSharingGroup(ssg1);
> >
> > Note that we could make it backward compatible by changing the proposed
> > StreamExecutionEnvironment#setSlotSharingGroupResource to
> > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
> > then use the string name for further reference.
> >
> > 2) I'm also not sure on the StreamExecutionEnvironment#
> > setSlotSharingGroupResources. What's the benefit of the Map version over
> > having the simple setter? Even if the user has a map
> > slotSharingGroupResources, he could simply do
> > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> >
> > 3) Is defining the ExternalResource part of this FLIP? I don't see a
> > Public* class yet. I'd be also fine to cut the scope of this FLIP and
> > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
> >
> > 4) We should probably use a builder pattern around
> > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
> > we need to fully specify that in the FLIP but it would be good to at least
> > say how they should be created by the user.
> >
> >
> >
> > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <[hidden email]> wrote:
> >
> > > @Yang
> > > In short, the external resources will participate in resource
> > > deduction and be logically ensured, but requesting an external
> > > resource must still be done through config options with the current
> > > default resource allocation strategy.
> > > In FLIP-56, we abstract the logic of resource allocation to the
> > > `ResourceAllocationStrategy`. Currently, with its default
> > > implementation, ResourceManager would still allocate TMs with the same
> > > resource spec and the external resources of it are configured through
> > > the config option as well. So, in your case, you need to define the
> > > "external-resources" and "external-resources.disk.amount". Then, all
> > > the disk requirements defined in the SSG will be logically ensured, as
> > > there is no slot level isolation. If the disk space of a task manager
> > > cannot fulfill the disk requirement, RM will allocate a new one.
> > > In the future, we'd like to introduce a `ResourceAllocationStrategy`
> > > which allocates heterogeneous TMs according to the requirements. Then,
> > > user only needs to define the driver of external resources when
> > > needed.
> > > Also, regarding the resource isolation, we may provide a fine-grained
> > > mode in which each slot can only fetch the information of external
> > > resources it requires in the future. But that is out of the scope of
> > > this PR.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <[hidden email]> wrote:
> > > >
> > > > Thanks @Yangze for preparing this FLIP.
> > > >
> > > > I think this is a good start point for the community users to have a
> > > taste
> > > > on the fine-grained
> > > > resource management, which we all believe it could improve the Flink job
> > > > stability and
> > > > cluster utilization.
> > > >
> > > > I have a simple question about the extended resources. It is possible to
> > > > combine extended resources
> > > > with fine-grained resource management. Except for the GPU, FPGA and other
> > > > new computing devices,
> > > > maybe the disk resource is a more general use case. For example,
> > > different
> > > > SSG may have various
> > > > disk requirements based on the state. So we need to allocate enough
> > > > ephemeral storage resource for every
> > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be evicted
> > > > due to running out of limits.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > >
> > > > Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:
> > > >
> > > > > I think being able to specify fine grained resource requirements
> > > without
> > > > > having to change the codes and recompile the job is indeed a good
> > > idea. It
> > > > > definitely improves the usability.
> > > > >
> > > > > However, this requires more careful designs, which probably deserves a
> > > > > separate thread. I'd be good to have that discussion, but maybe not
> > > block
> > > > > this feature on that.
> > > > >
> > > > > One idea concerning the configuration approach: As Yangze said, flink
> > > > > configuration options are supposed to take effect at cluster level. For
> > > > > updating job level specifics that are not suitable to be introduced as
> > > a
> > > > > config option, currently the only way is to pass them as program
> > > arguments.
> > > > > Would it make sense to introduce a general approach for overwriting
> > > such
> > > > > job specifics without re-compiling the job?
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]> wrote:
> > > > >
> > > > > > @Wenlong
> > > > > > After another consideration, the config option approach I mentioned
> > > > > > above might not be appropriate. The resource requirements for SSG
> > > > > > should be a job level configuration and should no be set in the
> > > > > > flink-conf.
> > > > > >
> > > > > > I think we can define a JSON format, which would be the ResourceSpecs
> > > > > > mapped by the name of SSGs, for the resource requirements of a
> > > > > > specific job. Then, we allow user to configure the file path of that
> > > > > > JSON. The JSON will be only parsed in runtime, which allows user to
> > > > > > tune it without re-compiling the job.
> > > > > >
> > > > > > We can add another #setSlotSharingGroupResources for configuring the
> > > > > > file path of that JSON:
> > > > > > ```
> > > > > > /**
> > > > > >  * Specify fine-grained resource requirements for slot sharing groups
> > > > > > with the given resource JSON file. The existing resource
> > > > > >  * requirement of the same slot sharing group will be replaced.
> > > > > >  */
> > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > > > >         String pathToResourceJson);
> > > > > > ```
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]>
> > > wrote:
> > > > > > >
> > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > > > >
> > > > > > > @Wenlong
> > > > > > > I think that is a good idea, adjust the resource without
> > > re-compiling
> > > > > > > the job will facilitate the tuning process.
> > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > > > > > > (welcome any proposal for the prefix naming) for the resource spec
> > > > > > > config of a slot sharing group. Then, user can set the
> > > ResourceSpec of
> > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > > [hidden email]>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks Yangze for the flip, it is great for users to be able to
> > > > > > declare the
> > > > > > > > fine-grained resource requirements for the job.
> > > > > > > >
> > > > > > > > I have one minor suggestion: can we support setting resource
> > > > > > requirements
> > > > > > > > by configuration? Currently most of the config options in
> > > execution
> > > > > > config
> > > > > > > > can be configured by configuration, and it is very likely that
> > > users
> > > > > > need
> > > > > > > > to adjust the resource according to the performance of their job
> > > > > during
> > > > > > > > debugging,  Providing a configuration way will make it more
> > > > > convenient.
> > > > > > > >
> > > > > > > > Bests,
> > > > > > > > Wenlong Lyu
> > > > > > > >
> > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <[hidden email]
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > > > > >
> > > > > > > > > The proposed changes look good to me.
> > > > > > > > >
> > > > > > > > > As you've mentioned in the implementation plan, I believe one
> > > of
> > > > > the
> > > > > > most
> > > > > > > > > important tasks of this FLIP is to have the feature well
> > > > > documented.
> > > > > > It
> > > > > > > > > would be really nice if we can keep that in mind and start
> > > drafting
> > > > > > the
> > > > > > > > > documentation early.
> > > > > > > > >
> > > > > > > > > Thank you~
> > > > > > > > >
> > > > > > > > > Xintong Song
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <[hidden email]>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, there,
> > > > > > > > > >
> > > > > > > > > > We would like to start a discussion thread on "FLIP-169:
> > > > > DataStream
> > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where we
> > > propose
> > > > > > the
> > > > > > > > > > DataStream API for specifying fine-grained resource
> > > requirements
> > > > > in
> > > > > > > > > > StreamExecutionEnvironment.
> > > > > > > > > >
> > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > Looking
> > > > > > > > > > forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yangze Guo
> > > > > > > > > >
> > > > > > > > >
> > > > > >
> > > > >
> > >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Arvid Heise-4
Hi Yangze,

Thanks for incorporating the ideas and sorry for missing the builder part.
My main idea is that SlotSharingGroup is immutable, such that the user
doesn't do:

ssg = new SlotSharingGroup();
ssg.setCpus(2);
operator1.slotSharingGroup(ssg);
ssg.setCpus(4);
operator2.slotSharingGroup(ssg);

and wonders why both operators have the same CPU spec. But the details can
be fleshed out in the actual PR.

On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo <[hidden email]> wrote:

> Thanks all for the discussion. I've updated the FLIP accordingly, the
> key changes are:
> - Introduce SlotSharingGroup instead of ResourceSpec which contains
> the resource spec of slot sharing group
> - Introduce two interfaces for specifying the SlotSharingGroup:
> #slotSharingGroup(SlotSharingGroup) and
> StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).
>
> If there is no more feedback, I'd start a vote next week.
>
> Best,
> Yangze Guo
>
> On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo <[hidden email]> wrote:
> >
> > Thanks for the valuable suggestion, Arvid.
> >
> > 1) Yes, we can add a new SlotSharingGroup which includes the name and
> > its resource. After that, we have two interfaces for configuring the
> > slot sharing group of an operator:
> > - #slotSharingGroup(String name)    // the resource of it can be
> > configured through StreamExecutionEnvironment#registerSlotSharingGroup
> > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the
> resource
> > And one interface to configure the resource of a SSG:
> > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> > We can also define the priority of the above two approaches, e.g. the
> > resource registering in the StreamExecutionEnvironment will always be
> > respected when conflict. That would be well documented.
> >
> > 2) Yes, I originally add this interface as a shortcut. It seems
> > unnecessary now. Will remove it.
> >
> > 3) I don't think we need to expose the ExternalResource. In the
> > builder of SlotSharingGroup, we can introduce a
> > #withExternalResource(String name, double value). Also, this interface
> > needs to be annotated as evolving.
> >
> > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
> > elaborate on the Builder for the SlotSharingGroup.
> >
> > WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise <[hidden email]> wrote:
> > >
> > > Hi Yangze,
> > >
> > > I like the general approach to bind requirements to slotsharing
> groups. I
> > > think the current approach is also flexible enough that a user could
> simply
> > > use ParameterTool or similar to use config values and wire that with
> their
> > > slotgroups, such that different requirements can be tested without
> > > recompilation. So I don't see an immediate need to provide a generic
> > > solution for yaml configuration for now.
> > >
> > > Looking at the programmatic interface though, I think we could improve
> by
> > > quite a bit and I haven't seen these alternatives being considered in
> the
> > > FLIP:
> > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> > > properties. Instead of using group names, the user could directly
> configure
> > > such an object.
> > >
> > >         SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> > > could also be omitted and auto-generated
> > >         ssg1.setCPUCores(4);
> > >         ...
> > >         DataStream<Tuple2<String, Integer>> grades =
> > >                 GradeSource
> > >                         .getSource(env, rate)
> > >
> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > >                         .slotSharingGroup(ssg1);
> > >         DataStream<Tuple2<String, Integer>> salaries =
> > >                 SalarySource.getSource(env, rate)
> > >
> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > >                         .slotSharingGroup(ssg2);
> > >
> > >         // run the actual window join program with the same slot
> sharing
> > > group as grades
> > >         DataStream<Tuple3<String, Integer, Integer>> joinedStream =
> > >                 runWindowJoin(grades, salaries,
> > > windowSize).slotSharingGroup(ssg1);
> > >
> > > Note that we could make it backward compatible by changing the proposed
> > > StreamExecutionEnvironment#setSlotSharingGroupResource to
> > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> and
> > > then use the string name for further reference.
> > >
> > > 2) I'm also not sure on the StreamExecutionEnvironment#
> > > setSlotSharingGroupResources. What's the benefit of the Map version
> over
> > > having the simple setter? Even if the user has a map
> > > slotSharingGroupResources, he could simply do
> > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> > >
> > > 3) Is defining the ExternalResource part of this FLIP? I don't see a
> > > Public* class yet. I'd be also fine to cut the scope of this FLIP and
> > > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
> > >
> > > 4) We should probably use a builder pattern around
> > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't
> think
> > > we need to fully specify that in the FLIP but it would be good to at
> least
> > > say how they should be created by the user.
> > >
> > >
> > >
> > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo <[hidden email]> wrote:
> > >
> > > > @Yang
> > > > In short, the external resources will participate in resource
> > > > deduction and be logically ensured, but requesting an external
> > > > resource must still be done through config options with the current
> > > > default resource allocation strategy.
> > > > In FLIP-56, we abstract the logic of resource allocation to the
> > > > `ResourceAllocationStrategy`. Currently, with its default
> > > > implementation, ResourceManager would still allocate TMs with the
> same
> > > > resource spec and the external resources of it are configured through
> > > > the config option as well. So, in your case, you need to define the
> > > > "external-resources" and "external-resources.disk.amount". Then, all
> > > > the disk requirements defined in the SSG will be logically ensured,
> as
> > > > there is no slot level isolation. If the disk space of a task manager
> > > > cannot fulfill the disk requirement, RM will allocate a new one.
> > > > In the future, we'd like to introduce a `ResourceAllocationStrategy`
> > > > which allocates heterogeneous TMs according to the requirements.
> Then,
> > > > user only needs to define the driver of external resources when
> > > > needed.
> > > > Also, regarding the resource isolation, we may provide a fine-grained
> > > > mode in which each slot can only fetch the information of external
> > > > resources it requires in the future. But that is out of the scope of
> > > > this PR.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang <[hidden email]>
> wrote:
> > > > >
> > > > > Thanks @Yangze for preparing this FLIP.
> > > > >
> > > > > I think this is a good start point for the community users to have
> a
> > > > taste
> > > > > on the fine-grained
> > > > > resource management, which we all believe it could improve the
> Flink job
> > > > > stability and
> > > > > cluster utilization.
> > > > >
> > > > > I have a simple question about the extended resources. It is
> possible to
> > > > > combine extended resources
> > > > > with fine-grained resource management. Except for the GPU, FPGA
> and other
> > > > > new computing devices,
> > > > > maybe the disk resource is a more general use case. For example,
> > > > different
> > > > > SSG may have various
> > > > > disk requirements based on the state. So we need to allocate enough
> > > > > ephemeral storage resource for every
> > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be
> evicted
> > > > > due to running out of limits.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > >
> > > > > Xintong Song <[hidden email]> 于2021年6月8日周二 下午1:47写道:
> > > > >
> > > > > > I think being able to specify fine grained resource requirements
> > > > without
> > > > > > having to change the codes and recompile the job is indeed a good
> > > > idea. It
> > > > > > definitely improves the usability.
> > > > > >
> > > > > > However, this requires more careful designs, which probably
> deserves a
> > > > > > separate thread. I'd be good to have that discussion, but maybe
> not
> > > > block
> > > > > > this feature on that.
> > > > > >
> > > > > > One idea concerning the configuration approach: As Yangze said,
> flink
> > > > > > configuration options are supposed to take effect at cluster
> level. For
> > > > > > updating job level specifics that are not suitable to be
> introduced as
> > > > a
> > > > > > config option, currently the only way is to pass them as program
> > > > arguments.
> > > > > > Would it make sense to introduce a general approach for
> overwriting
> > > > such
> > > > > > job specifics without re-compiling the job?
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo <[hidden email]>
> wrote:
> > > > > >
> > > > > > > @Wenlong
> > > > > > > After another consideration, the config option approach I
> mentioned
> > > > > > > above might not be appropriate. The resource requirements for
> SSG
> > > > > > > should be a job level configuration and should no be set in the
> > > > > > > flink-conf.
> > > > > > >
> > > > > > > I think we can define a JSON format, which would be the
> ResourceSpecs
> > > > > > > mapped by the name of SSGs, for the resource requirements of a
> > > > > > > specific job. Then, we allow user to configure the file path
> of that
> > > > > > > JSON. The JSON will be only parsed in runtime, which allows
> user to
> > > > > > > tune it without re-compiling the job.
> > > > > > >
> > > > > > > We can add another #setSlotSharingGroupResources for
> configuring the
> > > > > > > file path of that JSON:
> > > > > > > ```
> > > > > > > /**
> > > > > > >  * Specify fine-grained resource requirements for slot sharing
> groups
> > > > > > > with the given resource JSON file. The existing resource
> > > > > > >  * requirement of the same slot sharing group will be replaced.
> > > > > > >  */
> > > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
> > > > > > >         String pathToResourceJson);
> > > > > > > ```
> > > > > > >
> > > > > > > WDYT?
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo <[hidden email]
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
> > > > > > > >
> > > > > > > > @Wenlong
> > > > > > > > I think that is a good idea, adjust the resource without
> > > > re-compiling
> > > > > > > > the job will facilitate the tuning process.
> > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg
> name}"
> > > > > > > > (welcome any proposal for the prefix naming) for the
> resource spec
> > > > > > > > config of a slot sharing group. Then, user can set the
> > > > ResourceSpec of
> > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1:
> {cpu: 1.0,
> > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
> > > > [hidden email]>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks Yangze for the flip, it is great for users to be
> able to
> > > > > > > declare the
> > > > > > > > > fine-grained resource requirements for the job.
> > > > > > > > >
> > > > > > > > > I have one minor suggestion: can we support setting
> resource
> > > > > > > requirements
> > > > > > > > > by configuration? Currently most of the config options in
> > > > execution
> > > > > > > config
> > > > > > > > > can be configured by configuration, and it is very likely
> that
> > > > users
> > > > > > > need
> > > > > > > > > to adjust the resource according to the performance of
> their job
> > > > > > during
> > > > > > > > > debugging,  Providing a configuration way will make it more
> > > > > > convenient.
> > > > > > > > >
> > > > > > > > > Bests,
> > > > > > > > > Wenlong Lyu
> > > > > > > > >
> > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <
> [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Yangze for preparing the FLIP.
> > > > > > > > > >
> > > > > > > > > > The proposed changes look good to me.
> > > > > > > > > >
> > > > > > > > > > As you've mentioned in the implementation plan, I
> believe one
> > > > of
> > > > > > the
> > > > > > > most
> > > > > > > > > > important tasks of this FLIP is to have the feature well
> > > > > > documented.
> > > > > > > It
> > > > > > > > > > would be really nice if we can keep that in mind and
> start
> > > > drafting
> > > > > > > the
> > > > > > > > > > documentation early.
> > > > > > > > > >
> > > > > > > > > > Thank you~
> > > > > > > > > >
> > > > > > > > > > Xintong Song
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <
> [hidden email]>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, there,
> > > > > > > > > > >
> > > > > > > > > > > We would like to start a discussion thread on
> "FLIP-169:
> > > > > > DataStream
> > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where
> we
> > > > propose
> > > > > > > the
> > > > > > > > > > > DataStream API for specifying fine-grained resource
> > > > requirements
> > > > > > in
> > > > > > > > > > > StreamExecutionEnvironment.
> > > > > > > > > > >
> > > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > > Looking
> > > > > > > > > > > forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yangze Guo
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > > >
> > > >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yun Gao
Hi,

Very thanks @Yangze for bringing up this discuss. Overall +1 for
exposing the fine-grained resource requirements in the DataStream API.

One similar issue as Arvid has pointed out is that users may also creating
different SlotSharingGroup objects, with different names but with different
resources.  We might need to do some check internally. But We could also
leave that during the development of the actual PR.

Best,
Yun



 ------------------Original Mail ------------------
Sender:Arvid Heise <[hidden email]>
Send Date:Thu Jun 10 15:33:37 2021
Recipients:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements
Hi Yangze,



Thanks for incorporating the ideas and sorry for missing the builder part.

My main idea is that SlotSharingGroup is immutable, such that the user

doesn't do:



ssg = new SlotSharingGroup();

ssg.setCpus(2);

operator1.slotSharingGroup(ssg);

ssg.setCpus(4);

operator2.slotSharingGroup(ssg);



and wonders why both operators have the same CPU spec. But the details can

be fleshed out in the actual PR.



On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:



> Thanks all for the discussion. I've updated the FLIP accordingly, the

> key changes are:

> - Introduce SlotSharingGroup instead of ResourceSpec which contains

> the resource spec of slot sharing group

> - Introduce two interfaces for specifying the SlotSharingGroup:

> #slotSharingGroup(SlotSharingGroup) and

> StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).

>

> If there is no more feedback, I'd start a vote next week.

>

> Best,

> Yangze Guo

>

> On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:

> >

> > Thanks for the valuable suggestion, Arvid.

> >

> > 1) Yes, we can add a new SlotSharingGroup which includes the name and

> > its resource. After that, we have two interfaces for configuring the

> > slot sharing group of an operator:

> > - #slotSharingGroup(String name) // the resource of it can be

> > configured through StreamExecutionEnvironment#registerSlotSharingGroup

> > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the

> resource

> > And one interface to configure the resource of a SSG:

> > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)

> > We can also define the priority of the above two approaches, e.g. the

> > resource registering in the StreamExecutionEnvironment will always be

> > respected when conflict. That would be well documented.

> >

> > 2) Yes, I originally add this interface as a shortcut. It seems

> > unnecessary now. Will remove it.

> >

> > 3) I don't think we need to expose the ExternalResource. In the

> > builder of SlotSharingGroup, we can introduce a

> > #withExternalResource(String name, double value). Also, this interface

> > needs to be annotated as evolving.

> >

> > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to

> > elaborate on the Builder for the SlotSharingGroup.

> >

> > WDYT?

> >

> > Best,

> > Yangze Guo

> >

> > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:

> > >

> > > Hi Yangze,

> > >

> > > I like the general approach to bind requirements to slotsharing

> groups. I

> > > think the current approach is also flexible enough that a user could

> simply

> > > use ParameterTool or similar to use config values and wire that with

> their

> > > slotgroups, such that different requirements can be tested without

> > > recompilation. So I don't see an immediate need to provide a generic

> > > solution for yaml configuration for now.

> > >

> > > Looking at the programmatic interface though, I think we could improve

> by

> > > quite a bit and I haven't seen these alternatives being considered in

> the

> > > FLIP:

> > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec

> > > properties. Instead of using group names, the user could directly

> configure

> > > such an object.

> > >

> > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name

> > > could also be omitted and auto-generated

> > > ssg1.setCPUCores(4);

> > > ...

> > > DataStream> grades =

> > > GradeSource

> > > .getSource(env, rate)

> > >

> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())

> > > .slotSharingGroup(ssg1);

> > > DataStream> salaries =

> > > SalarySource.getSource(env, rate)

> > >

> > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())

> > > .slotSharingGroup(ssg2);

> > >

> > > // run the actual window join program with the same slot

> sharing

> > > group as grades

> > > DataStream> joinedStream =

> > > runWindowJoin(grades, salaries,

> > > windowSize).slotSharingGroup(ssg1);

> > >

> > > Note that we could make it backward compatible by changing the proposed

> > > StreamExecutionEnvironment#setSlotSharingGroupResource to

> > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)

> and

> > > then use the string name for further reference.

> > >

> > > 2) I'm also not sure on the StreamExecutionEnvironment#

> > > setSlotSharingGroupResources. What's the benefit of the Map version

> over

> > > having the simple setter? Even if the user has a map

> > > slotSharingGroupResources, he could simply do

> > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);

> > >

> > > 3) Is defining the ExternalResource part of this FLIP? I don't see a

> > > Public* class yet. I'd be also fine to cut the scope of this FLIP and

> > > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.

> > >

> > > 4) We should probably use a builder pattern around

> > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't

> think

> > > we need to fully specify that in the FLIP but it would be good to at

> least

> > > say how they should be created by the user.

> > >

> > >

> > >

> > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:

> > >

> > > > @Yang

> > > > In short, the external resources will participate in resource

> > > > deduction and be logically ensured, but requesting an external

> > > > resource must still be done through config options with the current

> > > > default resource allocation strategy.

> > > > In FLIP-56, we abstract the logic of resource allocation to the

> > > > `ResourceAllocationStrategy`. Currently, with its default

> > > > implementation, ResourceManager would still allocate TMs with the

> same

> > > > resource spec and the external resources of it are configured through

> > > > the config option as well. So, in your case, you need to define the

> > > > "external-resources" and "external-resources.disk.amount". Then, all

> > > > the disk requirements defined in the SSG will be logically ensured,

> as

> > > > there is no slot level isolation. If the disk space of a task manager

> > > > cannot fulfill the disk requirement, RM will allocate a new one.

> > > > In the future, we'd like to introduce a `ResourceAllocationStrategy`

> > > > which allocates heterogeneous TMs according to the requirements.

> Then,

> > > > user only needs to define the driver of external resources when

> > > > needed.

> > > > Also, regarding the resource isolation, we may provide a fine-grained

> > > > mode in which each slot can only fetch the information of external

> > > > resources it requires in the future. But that is out of the scope of

> > > > this PR.

> > > >

> > > > Best,

> > > > Yangze Guo

> > > >

> > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang

> wrote:

> > > > >

> > > > > Thanks @Yangze for preparing this FLIP.

> > > > >

> > > > > I think this is a good start point for the community users to have

> a

> > > > taste

> > > > > on the fine-grained

> > > > > resource management, which we all believe it could improve the

> Flink job

> > > > > stability and

> > > > > cluster utilization.

> > > > >

> > > > > I have a simple question about the extended resources. It is

> possible to

> > > > > combine extended resources

> > > > > with fine-grained resource management. Except for the GPU, FPGA

> and other

> > > > > new computing devices,

> > > > > maybe the disk resource is a more general use case. For example,

> > > > different

> > > > > SSG may have various

> > > > > disk requirements based on the state. So we need to allocate enough

> > > > > ephemeral storage resource for every

> > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be

> evicted

> > > > > due to running out of limits.

> > > > >

> > > > >

> > > > > Best,

> > > > > Yang

> > > > >

> > > > >

> > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:

> > > > >

> > > > > > I think being able to specify fine grained resource requirements

> > > > without

> > > > > > having to change the codes and recompile the job is indeed a good

> > > > idea. It

> > > > > > definitely improves the usability.

> > > > > >

> > > > > > However, this requires more careful designs, which probably

> deserves a

> > > > > > separate thread. I'd be good to have that discussion, but maybe

> not

> > > > block

> > > > > > this feature on that.

> > > > > >

> > > > > > One idea concerning the configuration approach: As Yangze said,

> flink

> > > > > > configuration options are supposed to take effect at cluster

> level. For

> > > > > > updating job level specifics that are not suitable to be

> introduced as

> > > > a

> > > > > > config option, currently the only way is to pass them as program

> > > > arguments.

> > > > > > Would it make sense to introduce a general approach for

> overwriting

> > > > such

> > > > > > job specifics without re-compiling the job?

> > > > > >

> > > > > > Thank you~

> > > > > >

> > > > > > Xintong Song

> > > > > >

> > > > > >

> > > > > >

> > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo

> wrote:

> > > > > >

> > > > > > > @Wenlong

> > > > > > > After another consideration, the config option approach I

> mentioned

> > > > > > > above might not be appropriate. The resource requirements for

> SSG

> > > > > > > should be a job level configuration and should no be set in the

> > > > > > > flink-conf.

> > > > > > >

> > > > > > > I think we can define a JSON format, which would be the

> ResourceSpecs

> > > > > > > mapped by the name of SSGs, for the resource requirements of a

> > > > > > > specific job. Then, we allow user to configure the file path

> of that

> > > > > > > JSON. The JSON will be only parsed in runtime, which allows

> user to

> > > > > > > tune it without re-compiling the job.

> > > > > > >

> > > > > > > We can add another #setSlotSharingGroupResources for

> configuring the

> > > > > > > file path of that JSON:

> > > > > > > ```

> > > > > > > /**

> > > > > > > * Specify fine-grained resource requirements for slot sharing

> groups

> > > > > > > with the given resource JSON file. The existing resource

> > > > > > > * requirement of the same slot sharing group will be replaced.

> > > > > > > */

> > > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(

> > > > > > > String pathToResourceJson);

> > > > > > > ```

> > > > > > >

> > > > > > > WDYT?

> > > > > > >

> > > > > > > Best,

> > > > > > > Yangze Guo

> > > > > > >

> > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo
> >

> > > > wrote:

> > > > > > > >

> > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!

> > > > > > > >

> > > > > > > > @Wenlong

> > > > > > > > I think that is a good idea, adjust the resource without

> > > > re-compiling

> > > > > > > > the job will facilitate the tuning process.

> > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg

> name}"

> > > > > > > > (welcome any proposal for the prefix naming) for the

> resource spec

> > > > > > > > config of a slot sharing group. Then, user can set the

> > > > ResourceSpec of

> > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1:

> {cpu: 1.0,

> > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?

> > > > > > > >

> > > > > > > >

> > > > > > > > Best,

> > > > > > > > Yangze Guo

> > > > > > > >

> > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <

> > > > [hidden email]>

> > > > > > > wrote:

> > > > > > > > >

> > > > > > > > > Thanks Yangze for the flip, it is great for users to be

> able to

> > > > > > > declare the

> > > > > > > > > fine-grained resource requirements for the job.

> > > > > > > > >

> > > > > > > > > I have one minor suggestion: can we support setting

> resource

> > > > > > > requirements

> > > > > > > > > by configuration? Currently most of the config options in

> > > > execution

> > > > > > > config

> > > > > > > > > can be configured by configuration, and it is very likely

> that

> > > > users

> > > > > > > need

> > > > > > > > > to adjust the resource according to the performance of

> their job

> > > > > > during

> > > > > > > > > debugging, Providing a configuration way will make it more

> > > > > > convenient.

> > > > > > > > >

> > > > > > > > > Bests,

> > > > > > > > > Wenlong Lyu

> > > > > > > > >

> > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <

> [hidden email]

> > > > >

> > > > > > > wrote:

> > > > > > > > >

> > > > > > > > > > Thanks Yangze for preparing the FLIP.

> > > > > > > > > >

> > > > > > > > > > The proposed changes look good to me.

> > > > > > > > > >

> > > > > > > > > > As you've mentioned in the implementation plan, I

> believe one

> > > > of

> > > > > > the

> > > > > > > most

> > > > > > > > > > important tasks of this FLIP is to have the feature well

> > > > > > documented.

> > > > > > > It

> > > > > > > > > > would be really nice if we can keep that in mind and

> start

> > > > drafting

> > > > > > > the

> > > > > > > > > > documentation early.

> > > > > > > > > >

> > > > > > > > > > Thank you~

> > > > > > > > > >

> > > > > > > > > > Xintong Song

> > > > > > > > > >

> > > > > > > > > >

> > > > > > > > > >

> > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <

> [hidden email]>

> > > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > > Hi, there,

> > > > > > > > > > >

> > > > > > > > > > > We would like to start a discussion thread on

> "FLIP-169:

> > > > > > DataStream

> > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where

> we

> > > > propose

> > > > > > > the

> > > > > > > > > > > DataStream API for specifying fine-grained resource

> > > > requirements

> > > > > > in

> > > > > > > > > > > StreamExecutionEnvironment.

> > > > > > > > > > >

> > > > > > > > > > > Please find more details in the FLIP wiki document [1].

> > > > Looking

> > > > > > > > > > > forward to your feedback.

> > > > > > > > > > >

> > > > > > > > > > > [1]

> > > > > > > > > > >

> > > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements

> > > > > > > > > > >

> > > > > > > > > > >

> > > > > > > > > > > Best,

> > > > > > > > > > > Yangze Guo

> > > > > > > > > > >

> > > > > > > > > >

> > > > > > >

> > > > > >

> > > >

>

Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

Yangze Guo
Thanks for the supplement, Arvid and Yun. I've annotated these two
points in the FLIP.
The vote is now started in [1].

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-169-DataStream-API-for-Fine-Grained-Resource-Requirements-td51381.html

Best,
Yangze Guo

On Fri, Jun 11, 2021 at 2:50 PM Yun Gao <[hidden email]> wrote:

>
> Hi,
>
> Very thanks @Yangze for bringing up this discuss. Overall +1 for
> exposing the fine-grained resource requirements in the DataStream API.
>
> One similar issue as Arvid has pointed out is that users may also creating
> different SlotSharingGroup objects, with different names but with different
> resources.  We might need to do some check internally. But We could also
> leave that during the development of the actual PR.
>
> Best,
> Yun
>
>
>
>  ------------------Original Mail ------------------
> Sender:Arvid Heise <[hidden email]>
> Send Date:Thu Jun 10 15:33:37 2021
> Recipients:dev <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements
> Hi Yangze,
>
>
>
> Thanks for incorporating the ideas and sorry for missing the builder part.
>
> My main idea is that SlotSharingGroup is immutable, such that the user
>
> doesn't do:
>
>
>
> ssg = new SlotSharingGroup();
>
> ssg.setCpus(2);
>
> operator1.slotSharingGroup(ssg);
>
> ssg.setCpus(4);
>
> operator2.slotSharingGroup(ssg);
>
>
>
> and wonders why both operators have the same CPU spec. But the details can
>
> be fleshed out in the actual PR.
>
>
>
> On Thu, Jun 10, 2021 at 5:13 AM Yangze Guo  wrote:
>
>
>
> > Thanks all for the discussion. I've updated the FLIP accordingly, the
>
> > key changes are:
>
> > - Introduce SlotSharingGroup instead of ResourceSpec which contains
>
> > the resource spec of slot sharing group
>
> > - Introduce two interfaces for specifying the SlotSharingGroup:
>
> > #slotSharingGroup(SlotSharingGroup) and
>
> > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).
>
> >
>
> > If there is no more feedback, I'd start a vote next week.
>
> >
>
> > Best,
>
> > Yangze Guo
>
> >
>
> > On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:
>
> > >
>
> > > Thanks for the valuable suggestion, Arvid.
>
> > >
>
> > > 1) Yes, we can add a new SlotSharingGroup which includes the name and
>
> > > its resource. After that, we have two interfaces for configuring the
>
> > > slot sharing group of an operator:
>
> > > - #slotSharingGroup(String name) // the resource of it can be
>
> > > configured through StreamExecutionEnvironment#registerSlotSharingGroup
>
> > > - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the
>
> > resource
>
> > > And one interface to configure the resource of a SSG:
>
> > > - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
>
> > > We can also define the priority of the above two approaches, e.g. the
>
> > > resource registering in the StreamExecutionEnvironment will always be
>
> > > respected when conflict. That would be well documented.
>
> > >
>
> > > 2) Yes, I originally add this interface as a shortcut. It seems
>
> > > unnecessary now. Will remove it.
>
> > >
>
> > > 3) I don't think we need to expose the ExternalResource. In the
>
> > > builder of SlotSharingGroup, we can introduce a
>
> > > #withExternalResource(String name, double value). Also, this interface
>
> > > needs to be annotated as evolving.
>
> > >
>
> > > 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
>
> > > elaborate on the Builder for the SlotSharingGroup.
>
> > >
>
> > > WDYT?
>
> > >
>
> > > Best,
>
> > > Yangze Guo
>
> > >
>
> > > On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
>
> > > >
>
> > > > Hi Yangze,
>
> > > >
>
> > > > I like the general approach to bind requirements to slotsharing
>
> > groups. I
>
> > > > think the current approach is also flexible enough that a user could
>
> > simply
>
> > > > use ParameterTool or similar to use config values and wire that with
>
> > their
>
> > > > slotgroups, such that different requirements can be tested without
>
> > > > recompilation. So I don't see an immediate need to provide a generic
>
> > > > solution for yaml configuration for now.
>
> > > >
>
> > > > Looking at the programmatic interface though, I think we could improve
>
> > by
>
> > > > quite a bit and I haven't seen these alternatives being considered in
>
> > the
>
> > > > FLIP:
>
> > > > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
>
> > > > properties. Instead of using group names, the user could directly
>
> > configure
>
> > > > such an object.
>
> > > >
>
> > > > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
>
> > > > could also be omitted and auto-generated
>
> > > > ssg1.setCPUCores(4);
>
> > > > ...
>
> > > > DataStream> grades =
>
> > > > GradeSource
>
> > > > .getSource(env, rate)
>
> > > >
>
> > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>
> > > > .slotSharingGroup(ssg1);
>
> > > > DataStream> salaries =
>
> > > > SalarySource.getSource(env, rate)
>
> > > >
>
> > > > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
>
> > > > .slotSharingGroup(ssg2);
>
> > > >
>
> > > > // run the actual window join program with the same slot
>
> > sharing
>
> > > > group as grades
>
> > > > DataStream> joinedStream =
>
> > > > runWindowJoin(grades, salaries,
>
> > > > windowSize).slotSharingGroup(ssg1);
>
> > > >
>
> > > > Note that we could make it backward compatible by changing the proposed
>
> > > > StreamExecutionEnvironment#setSlotSharingGroupResource to
>
> > > > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
>
> > and
>
> > > > then use the string name for further reference.
>
> > > >
>
> > > > 2) I'm also not sure on the StreamExecutionEnvironment#
>
> > > > setSlotSharingGroupResources. What's the benefit of the Map version
>
> > over
>
> > > > having the simple setter? Even if the user has a map
>
> > > > slotSharingGroupResources, he could simply do
>
> > > > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
>
> > > >
>
> > > > 3) Is defining the ExternalResource part of this FLIP? I don't see a
>
> > > > Public* class yet. I'd be also fine to cut the scope of this FLIP and
>
> > > > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
>
> > > >
>
> > > > 4) We should probably use a builder pattern around
>
> > > > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't
>
> > think
>
> > > > we need to fully specify that in the FLIP but it would be good to at
>
> > least
>
> > > > say how they should be created by the user.
>
> > > >
>
> > > >
>
> > > >
>
> > > > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
>
> > > >
>
> > > > > @Yang
>
> > > > > In short, the external resources will participate in resource
>
> > > > > deduction and be logically ensured, but requesting an external
>
> > > > > resource must still be done through config options with the current
>
> > > > > default resource allocation strategy.
>
> > > > > In FLIP-56, we abstract the logic of resource allocation to the
>
> > > > > `ResourceAllocationStrategy`. Currently, with its default
>
> > > > > implementation, ResourceManager would still allocate TMs with the
>
> > same
>
> > > > > resource spec and the external resources of it are configured through
>
> > > > > the config option as well. So, in your case, you need to define the
>
> > > > > "external-resources" and "external-resources.disk.amount". Then, all
>
> > > > > the disk requirements defined in the SSG will be logically ensured,
>
> > as
>
> > > > > there is no slot level isolation. If the disk space of a task manager
>
> > > > > cannot fulfill the disk requirement, RM will allocate a new one.
>
> > > > > In the future, we'd like to introduce a `ResourceAllocationStrategy`
>
> > > > > which allocates heterogeneous TMs according to the requirements.
>
> > Then,
>
> > > > > user only needs to define the driver of external resources when
>
> > > > > needed.
>
> > > > > Also, regarding the resource isolation, we may provide a fine-grained
>
> > > > > mode in which each slot can only fetch the information of external
>
> > > > > resources it requires in the future. But that is out of the scope of
>
> > > > > this PR.
>
> > > > >
>
> > > > > Best,
>
> > > > > Yangze Guo
>
> > > > >
>
> > > > > On Tue, Jun 8, 2021 at 4:20 PM Yang Wang
>
> > wrote:
>
> > > > > >
>
> > > > > > Thanks @Yangze for preparing this FLIP.
>
> > > > > >
>
> > > > > > I think this is a good start point for the community users to have
>
> > a
>
> > > > > taste
>
> > > > > > on the fine-grained
>
> > > > > > resource management, which we all believe it could improve the
>
> > Flink job
>
> > > > > > stability and
>
> > > > > > cluster utilization.
>
> > > > > >
>
> > > > > > I have a simple question about the extended resources. It is
>
> > possible to
>
> > > > > > combine extended resources
>
> > > > > > with fine-grained resource management. Except for the GPU, FPGA
>
> > and other
>
> > > > > > new computing devices,
>
> > > > > > maybe the disk resource is a more general use case. For example,
>
> > > > > different
>
> > > > > > SSG may have various
>
> > > > > > disk requirements based on the state. So we need to allocate enough
>
> > > > > > ephemeral storage resource for every
>
> > > > > > TaskManager pod in Kubernetes deployment. Otherwise, it might be
>
> > evicted
>
> > > > > > due to running out of limits.
>
> > > > > >
>
> > > > > >
>
> > > > > > Best,
>
> > > > > > Yang
>
> > > > > >
>
> > > > > >
>
> > > > > > Xintong Song  于2021年6月8日周二 下午1:47写道:
>
> > > > > >
>
> > > > > > > I think being able to specify fine grained resource requirements
>
> > > > > without
>
> > > > > > > having to change the codes and recompile the job is indeed a good
>
> > > > > idea. It
>
> > > > > > > definitely improves the usability.
>
> > > > > > >
>
> > > > > > > However, this requires more careful designs, which probably
>
> > deserves a
>
> > > > > > > separate thread. I'd be good to have that discussion, but maybe
>
> > not
>
> > > > > block
>
> > > > > > > this feature on that.
>
> > > > > > >
>
> > > > > > > One idea concerning the configuration approach: As Yangze said,
>
> > flink
>
> > > > > > > configuration options are supposed to take effect at cluster
>
> > level. For
>
> > > > > > > updating job level specifics that are not suitable to be
>
> > introduced as
>
> > > > > a
>
> > > > > > > config option, currently the only way is to pass them as program
>
> > > > > arguments.
>
> > > > > > > Would it make sense to introduce a general approach for
>
> > overwriting
>
> > > > > such
>
> > > > > > > job specifics without re-compiling the job?
>
> > > > > > >
>
> > > > > > > Thank you~
>
> > > > > > >
>
> > > > > > > Xintong Song
>
> > > > > > >
>
> > > > > > >
>
> > > > > > >
>
> > > > > > > On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo
>
> > wrote:
>
> > > > > > >
>
> > > > > > > > @Wenlong
>
> > > > > > > > After another consideration, the config option approach I
>
> > mentioned
>
> > > > > > > > above might not be appropriate. The resource requirements for
>
> > SSG
>
> > > > > > > > should be a job level configuration and should no be set in the
>
> > > > > > > > flink-conf.
>
> > > > > > > >
>
> > > > > > > > I think we can define a JSON format, which would be the
>
> > ResourceSpecs
>
> > > > > > > > mapped by the name of SSGs, for the resource requirements of a
>
> > > > > > > > specific job. Then, we allow user to configure the file path
>
> > of that
>
> > > > > > > > JSON. The JSON will be only parsed in runtime, which allows
>
> > user to
>
> > > > > > > > tune it without re-compiling the job.
>
> > > > > > > >
>
> > > > > > > > We can add another #setSlotSharingGroupResources for
>
> > configuring the
>
> > > > > > > > file path of that JSON:
>
> > > > > > > > ```
>
> > > > > > > > /**
>
> > > > > > > > * Specify fine-grained resource requirements for slot sharing
>
> > groups
>
> > > > > > > > with the given resource JSON file. The existing resource
>
> > > > > > > > * requirement of the same slot sharing group will be replaced.
>
> > > > > > > > */
>
> > > > > > > > public StreamExecutionEnvironment setSlotSharingGroupResources(
>
> > > > > > > > String pathToResourceJson);
>
> > > > > > > > ```
>
> > > > > > > >
>
> > > > > > > > WDYT?
>
> > > > > > > >
>
> > > > > > > > Best,
>
> > > > > > > > Yangze Guo
>
> > > > > > > >
>
> > > > > > > > On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo
> > >
>
> > > > > wrote:
>
> > > > > > > > >
>
> > > > > > > > > Thanks for the feedbacks, Xintong and Wenlong!
>
> > > > > > > > >
>
> > > > > > > > > @Wenlong
>
> > > > > > > > > I think that is a good idea, adjust the resource without
>
> > > > > re-compiling
>
> > > > > > > > > the job will facilitate the tuning process.
>
> > > > > > > > > We can define a pattern "slot-sharing-group.resource.{ssg
>
> > name}"
>
> > > > > > > > > (welcome any proposal for the prefix naming) for the
>
> > resource spec
>
> > > > > > > > > config of a slot sharing group. Then, user can set the
>
> > > > > ResourceSpec of
>
> > > > > > > > > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1:
>
> > {cpu: 1.0,
>
> > > > > > > > > heap: 100m, off-heap: 100m....}". WDYT?
>
> > > > > > > > >
>
> > > > > > > > >
>
> > > > > > > > > Best,
>
> > > > > > > > > Yangze Guo
>
> > > > > > > > >
>
> > > > > > > > > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl <
>
> > > > > [hidden email]>
>
> > > > > > > > wrote:
>
> > > > > > > > > >
>
> > > > > > > > > > Thanks Yangze for the flip, it is great for users to be
>
> > able to
>
> > > > > > > > declare the
>
> > > > > > > > > > fine-grained resource requirements for the job.
>
> > > > > > > > > >
>
> > > > > > > > > > I have one minor suggestion: can we support setting
>
> > resource
>
> > > > > > > > requirements
>
> > > > > > > > > > by configuration? Currently most of the config options in
>
> > > > > execution
>
> > > > > > > > config
>
> > > > > > > > > > can be configured by configuration, and it is very likely
>
> > that
>
> > > > > users
>
> > > > > > > > need
>
> > > > > > > > > > to adjust the resource according to the performance of
>
> > their job
>
> > > > > > > during
>
> > > > > > > > > > debugging, Providing a configuration way will make it more
>
> > > > > > > convenient.
>
> > > > > > > > > >
>
> > > > > > > > > > Bests,
>
> > > > > > > > > > Wenlong Lyu
>
> > > > > > > > > >
>
> > > > > > > > > > On Thu, 3 Jun 2021 at 15:59, Xintong Song <
>
> > [hidden email]
>
> > > > > >
>
> > > > > > > > wrote:
>
> > > > > > > > > >
>
> > > > > > > > > > > Thanks Yangze for preparing the FLIP.
>
> > > > > > > > > > >
>
> > > > > > > > > > > The proposed changes look good to me.
>
> > > > > > > > > > >
>
> > > > > > > > > > > As you've mentioned in the implementation plan, I
>
> > believe one
>
> > > > > of
>
> > > > > > > the
>
> > > > > > > > most
>
> > > > > > > > > > > important tasks of this FLIP is to have the feature well
>
> > > > > > > documented.
>
> > > > > > > > It
>
> > > > > > > > > > > would be really nice if we can keep that in mind and
>
> > start
>
> > > > > drafting
>
> > > > > > > > the
>
> > > > > > > > > > > documentation early.
>
> > > > > > > > > > >
>
> > > > > > > > > > > Thank you~
>
> > > > > > > > > > >
>
> > > > > > > > > > > Xintong Song
>
> > > > > > > > > > >
>
> > > > > > > > > > >
>
> > > > > > > > > > >
>
> > > > > > > > > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo <
>
> > [hidden email]>
>
> > > > > > > > wrote:
>
> > > > > > > > > > >
>
> > > > > > > > > > > > Hi, there,
>
> > > > > > > > > > > >
>
> > > > > > > > > > > > We would like to start a discussion thread on
>
> > "FLIP-169:
>
> > > > > > > DataStream
>
> > > > > > > > > > > > API for Fine-Grained Resource Requirements"[1], where
>
> > we
>
> > > > > propose
>
> > > > > > > > the
>
> > > > > > > > > > > > DataStream API for specifying fine-grained resource
>
> > > > > requirements
>
> > > > > > > in
>
> > > > > > > > > > > > StreamExecutionEnvironment.
>
> > > > > > > > > > > >
>
> > > > > > > > > > > > Please find more details in the FLIP wiki document [1].
>
> > > > > Looking
>
> > > > > > > > > > > > forward to your feedback.
>
> > > > > > > > > > > >
>
> > > > > > > > > > > > [1]
>
> > > > > > > > > > > >
>
> > > > > > > > > > >
>
> > > > > > > >
>
> > > > > > >
>
> > > > >
>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
>
> > > > > > > > > > > >
>
> > > > > > > > > > > >
>
> > > > > > > > > > > > Best,
>
> > > > > > > > > > > > Yangze Guo
>
> > > > > > > > > > > >
>
> > > > > > > > > > >
>
> > > > > > > >
>
> > > > > > >
>
> > > > >
>
> >
>