[DISCUSS] FLIP-119: Pipelined Region Scheduling

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-119: Pipelined Region Scheduling

Gary Yao-4
Hi community,

In the past releases, we have been working on refactoring Flink's scheduler
with the goal of making the scheduler extensible [1]. We have rolled out
most of the intended refactoring in Flink 1.10, and we think it is now time
to leverage our newly introduced abstractions to implement a new resource
optimized scheduling strategy: Pipelined Region Scheduling.

This scheduling strategy aims at:

    * avoidance of resource deadlocks when running batch jobs

    * tunable with respect to resource consumption and throughput

More details can be found in the Wiki [2]. We are looking forward to your
feedback.

Best,

Zhu Zhu & Gary

[1] https://issues.apache.org/jira/browse/FLINK-10429

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Yangze Guo
Thanks for driving this discussion, Zhu Zhu & Gary.

I found that the image link in this FLIP is not working well. When I
open that link, Google doc told me that I have no access privilege.
Could you take a look at that issue?

Best,
Yangze Guo

On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:

>
> Hi community,
>
> In the past releases, we have been working on refactoring Flink's scheduler
> with the goal of making the scheduler extensible [1]. We have rolled out
> most of the intended refactoring in Flink 1.10, and we think it is now time
> to leverage our newly introduced abstractions to implement a new resource
> optimized scheduling strategy: Pipelined Region Scheduling.
>
> This scheduling strategy aims at:
>
>     * avoidance of resource deadlocks when running batch jobs
>
>     * tunable with respect to resource consumption and throughput
>
> More details can be found in the Wiki [2]. We are looking forward to your
> feedback.
>
> Best,
>
> Zhu Zhu & Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-10429
>
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Zhu Zhu
Thanks for reporting this Yangze.
I have update the permission to those images. Everyone are able to view
them now.

Thanks,
Zhu Zhu

Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:

> Thanks for driving this discussion, Zhu Zhu & Gary.
>
> I found that the image link in this FLIP is not working well. When I
> open that link, Google doc told me that I have no access privilege.
> Could you take a look at that issue?
>
> Best,
> Yangze Guo
>
> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> >
> > Hi community,
> >
> > In the past releases, we have been working on refactoring Flink's
> scheduler
> > with the goal of making the scheduler extensible [1]. We have rolled out
> > most of the intended refactoring in Flink 1.10, and we think it is now
> time
> > to leverage our newly introduced abstractions to implement a new resource
> > optimized scheduling strategy: Pipelined Region Scheduling.
> >
> > This scheduling strategy aims at:
> >
> >     * avoidance of resource deadlocks when running batch jobs
> >
> >     * tunable with respect to resource consumption and throughput
> >
> > More details can be found in the Wiki [2]. We are looking forward to your
> > feedback.
> >
> > Best,
> >
> > Zhu Zhu & Gary
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> >
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Yangze Guo
Thanks for updating!

+1 for supporting the pipelined region scheduling. Although we could
not prevent resource deadlock in all scenarios, it is really a big
step.

The design generally LGTM.

One minor thing I want to make sure. If I understand correctly, the
blocking edge will not be consumable before the upstream is finished.
Without it, when the failure occurs in the upstream region, there is
still possible to have a resource deadlock. I don't know whether it is
an explicit protocol now. But after this FLIP, I think it should not
be broken.
I'm also wondering could we execute the upstream and downstream
regions at the same time if we have enough resources. It can shorten
the running time of large job. We should not break the protocol of
blocking edge. But if it is possible to change the data exchange mode
of two regions dynamically?

Best,
Yangze Guo

On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:

>
> Thanks for reporting this Yangze.
> I have update the permission to those images. Everyone are able to view them now.
>
> Thanks,
> Zhu Zhu
>
> Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
>>
>> Thanks for driving this discussion, Zhu Zhu & Gary.
>>
>> I found that the image link in this FLIP is not working well. When I
>> open that link, Google doc told me that I have no access privilege.
>> Could you take a look at that issue?
>>
>> Best,
>> Yangze Guo
>>
>> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
>> >
>> > Hi community,
>> >
>> > In the past releases, we have been working on refactoring Flink's scheduler
>> > with the goal of making the scheduler extensible [1]. We have rolled out
>> > most of the intended refactoring in Flink 1.10, and we think it is now time
>> > to leverage our newly introduced abstractions to implement a new resource
>> > optimized scheduling strategy: Pipelined Region Scheduling.
>> >
>> > This scheduling strategy aims at:
>> >
>> >     * avoidance of resource deadlocks when running batch jobs
>> >
>> >     * tunable with respect to resource consumption and throughput
>> >
>> > More details can be found in the Wiki [2]. We are looking forward to your
>> > feedback.
>> >
>> > Best,
>> >
>> > Zhu Zhu & Gary
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-10429
>> >
>> > [2]
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Zhu Zhu
To Yangze,

>> the blocking edge will not be consumable before the upstream is finished.
Yes. This is how we define a BLOCKING result partition, "Blocking
partitions represent blocking data exchanges, where the data stream is
first fully produced and then consumed".

>> I'm also wondering could we execute the upstream and downstream regions
at the same time if we have enough resources
It may lead to resource waste since the tasks in downstream regions cannot
read any data before the upstream region finishes. It saves a bit time on
schedule, but usually it does not make much difference for large jobs,
since data processing takes much more time. For small jobs, one can make
all edges PIPELINED so that all the tasks can be scheduled at the same time.

>> is it possible to change the data exchange mode of two regions
dynamically?
This is not in the scope of the FLIP. But we are moving forward to a more
extensible scheduler (FLINK-10429) and resource aware scheduling
(FLINK-10407).
So I think it's possible we can have a scheduler in the future which
dynamically changes the shuffle type wisely regarding available resources.

Thanks,
Zhu Zhu

Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:

> Thanks for updating!
>
> +1 for supporting the pipelined region scheduling. Although we could
> not prevent resource deadlock in all scenarios, it is really a big
> step.
>
> The design generally LGTM.
>
> One minor thing I want to make sure. If I understand correctly, the
> blocking edge will not be consumable before the upstream is finished.
> Without it, when the failure occurs in the upstream region, there is
> still possible to have a resource deadlock. I don't know whether it is
> an explicit protocol now. But after this FLIP, I think it should not
> be broken.
> I'm also wondering could we execute the upstream and downstream
> regions at the same time if we have enough resources. It can shorten
> the running time of large job. We should not break the protocol of
> blocking edge. But if it is possible to change the data exchange mode
> of two regions dynamically?
>
> Best,
> Yangze Guo
>
> On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> >
> > Thanks for reporting this Yangze.
> > I have update the permission to those images. Everyone are able to view
> them now.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> >>
> >> Thanks for driving this discussion, Zhu Zhu & Gary.
> >>
> >> I found that the image link in this FLIP is not working well. When I
> >> open that link, Google doc told me that I have no access privilege.
> >> Could you take a look at that issue?
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> >> >
> >> > Hi community,
> >> >
> >> > In the past releases, we have been working on refactoring Flink's
> scheduler
> >> > with the goal of making the scheduler extensible [1]. We have rolled
> out
> >> > most of the intended refactoring in Flink 1.10, and we think it is
> now time
> >> > to leverage our newly introduced abstractions to implement a new
> resource
> >> > optimized scheduling strategy: Pipelined Region Scheduling.
> >> >
> >> > This scheduling strategy aims at:
> >> >
> >> >     * avoidance of resource deadlocks when running batch jobs
> >> >
> >> >     * tunable with respect to resource consumption and throughput
> >> >
> >> > More details can be found in the Wiki [2]. We are looking forward to
> your
> >> > feedback.
> >> >
> >> > Best,
> >> >
> >> > Zhu Zhu & Gary
> >> >
> >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> >> >
> >> > [2]
> >> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Xintong Song
Gary & Zhu Zhu,

Thanks for preparing this FLIP, and a BIG +1 from my side. The trade-off
between resource utilization and potential deadlock problems has always
been a pain. Despite not solving all the deadlock cases, this FLIP is
definitely a big improvement. IIUC, it has already covered all the existing
single job cases, and all the mentioned non-covered cases are either in
multi-job session clusters or with diverse slot resources in future.

I've read through the FLIP, and it looks really good to me. Good job! All
the concerns and limitations that I can think of have already been clearly
stated, with reasonable potential future solutions. From the perspective of
fine-grained resource management, I do not see any serious/irresolvable
conflict at this time.

nit: The in-page links are not working. I guess those are copied from
google docs directly?


Thank you~

Xintong Song



On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:

> To Yangze,
>
> >> the blocking edge will not be consumable before the upstream is
> finished.
> Yes. This is how we define a BLOCKING result partition, "Blocking
> partitions represent blocking data exchanges, where the data stream is
> first fully produced and then consumed".
>
> >> I'm also wondering could we execute the upstream and downstream regions
> at the same time if we have enough resources
> It may lead to resource waste since the tasks in downstream regions cannot
> read any data before the upstream region finishes. It saves a bit time on
> schedule, but usually it does not make much difference for large jobs,
> since data processing takes much more time. For small jobs, one can make
> all edges PIPELINED so that all the tasks can be scheduled at the same
> time.
>
> >> is it possible to change the data exchange mode of two regions
> dynamically?
> This is not in the scope of the FLIP. But we are moving forward to a more
> extensible scheduler (FLINK-10429) and resource aware scheduling
> (FLINK-10407).
> So I think it's possible we can have a scheduler in the future which
> dynamically changes the shuffle type wisely regarding available resources.
>
> Thanks,
> Zhu Zhu
>
> Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
>
> > Thanks for updating!
> >
> > +1 for supporting the pipelined region scheduling. Although we could
> > not prevent resource deadlock in all scenarios, it is really a big
> > step.
> >
> > The design generally LGTM.
> >
> > One minor thing I want to make sure. If I understand correctly, the
> > blocking edge will not be consumable before the upstream is finished.
> > Without it, when the failure occurs in the upstream region, there is
> > still possible to have a resource deadlock. I don't know whether it is
> > an explicit protocol now. But after this FLIP, I think it should not
> > be broken.
> > I'm also wondering could we execute the upstream and downstream
> > regions at the same time if we have enough resources. It can shorten
> > the running time of large job. We should not break the protocol of
> > blocking edge. But if it is possible to change the data exchange mode
> > of two regions dynamically?
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> > >
> > > Thanks for reporting this Yangze.
> > > I have update the permission to those images. Everyone are able to view
> > them now.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > >>
> > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > >>
> > >> I found that the image link in this FLIP is not working well. When I
> > >> open that link, Google doc told me that I have no access privilege.
> > >> Could you take a look at that issue?
> > >>
> > >> Best,
> > >> Yangze Guo
> > >>
> > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> > >> >
> > >> > Hi community,
> > >> >
> > >> > In the past releases, we have been working on refactoring Flink's
> > scheduler
> > >> > with the goal of making the scheduler extensible [1]. We have rolled
> > out
> > >> > most of the intended refactoring in Flink 1.10, and we think it is
> > now time
> > >> > to leverage our newly introduced abstractions to implement a new
> > resource
> > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > >> >
> > >> > This scheduling strategy aims at:
> > >> >
> > >> >     * avoidance of resource deadlocks when running batch jobs
> > >> >
> > >> >     * tunable with respect to resource consumption and throughput
> > >> >
> > >> > More details can be found in the Wiki [2]. We are looking forward to
> > your
> > >> > feedback.
> > >> >
> > >> > Best,
> > >> >
> > >> > Zhu Zhu & Gary
> > >> >
> > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > >> >
> > >> > [2]
> > >> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Till Rohrmann
Thanks for creating this FLIP Zhu Zhu and Gary!

+1 for adding pipelined region scheduling.

Concerning the extended SlotProvider interface I have an idea how we could
further improve it. If I am not mistaken, then you have proposed to
introduce the two timeouts in order to distinguish between batch and
streaming jobs and to encode that batch job requests can wait if there are
enough resources in the SlotPool (not necessarily being available right
now). I think what we actually need to tell the SlotProvider is whether a
request will use the slot only for a limited time or not. This is exactly
the difference between processing bounded and unbounded streams. If the
SlotProvider knows this difference, then it can tell which slots will
eventually be reusable and which not. Based on this it can tell whether a
slot request can be fulfilled eventually or whether we fail after the
specified timeout. Another benefit of this approach would be that we can
easily support mixed bounded/unbounded workloads. What we would need to
know for this approach is whether a pipelined region is processing a
bounded or unbounded stream.

To give an example let's assume we request the following sets of slots
where each pipelined region requires the same slots:

slotProvider.allocateSlots(pr1_bounded, timeout);
slotProvider.allocateSlots(pr2_unbounded, timeout);
slotProvider.allocateSlots(pr3_bounded, timeout);

Let's assume we receive slots for pr1_bounded in < timeout and can then
fulfill the request. Then we request pr2_unbounded. Since we know that
pr1_bounded will complete eventually, we don't fail this request after
timeout. Next we request pr3_bounded after pr2_unbounded has been
completed. In this case, we see that we need to request new resources
because pr2_unbounded won't release its slots. Hence, if we cannot allocate
new resources within timeout, we fail this request.

A small comment concerning "Resource deadlocks when slot allocation
competition happens between multiple jobs in a session cluster": Another
idea to solve this situation would be to give the ResourceManager the right
to revoke slot assignments in order to change the mapping between requests
and available slots.

Cheers,
Till

On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]> wrote:

> Gary & Zhu Zhu,
>
> Thanks for preparing this FLIP, and a BIG +1 from my side. The trade-off
> between resource utilization and potential deadlock problems has always
> been a pain. Despite not solving all the deadlock cases, this FLIP is
> definitely a big improvement. IIUC, it has already covered all the existing
> single job cases, and all the mentioned non-covered cases are either in
> multi-job session clusters or with diverse slot resources in future.
>
> I've read through the FLIP, and it looks really good to me. Good job! All
> the concerns and limitations that I can think of have already been clearly
> stated, with reasonable potential future solutions. From the perspective of
> fine-grained resource management, I do not see any serious/irresolvable
> conflict at this time.
>
> nit: The in-page links are not working. I guess those are copied from
> google docs directly?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
>
> > To Yangze,
> >
> > >> the blocking edge will not be consumable before the upstream is
> > finished.
> > Yes. This is how we define a BLOCKING result partition, "Blocking
> > partitions represent blocking data exchanges, where the data stream is
> > first fully produced and then consumed".
> >
> > >> I'm also wondering could we execute the upstream and downstream
> regions
> > at the same time if we have enough resources
> > It may lead to resource waste since the tasks in downstream regions
> cannot
> > read any data before the upstream region finishes. It saves a bit time on
> > schedule, but usually it does not make much difference for large jobs,
> > since data processing takes much more time. For small jobs, one can make
> > all edges PIPELINED so that all the tasks can be scheduled at the same
> > time.
> >
> > >> is it possible to change the data exchange mode of two regions
> > dynamically?
> > This is not in the scope of the FLIP. But we are moving forward to a more
> > extensible scheduler (FLINK-10429) and resource aware scheduling
> > (FLINK-10407).
> > So I think it's possible we can have a scheduler in the future which
> > dynamically changes the shuffle type wisely regarding available
> resources.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> >
> > > Thanks for updating!
> > >
> > > +1 for supporting the pipelined region scheduling. Although we could
> > > not prevent resource deadlock in all scenarios, it is really a big
> > > step.
> > >
> > > The design generally LGTM.
> > >
> > > One minor thing I want to make sure. If I understand correctly, the
> > > blocking edge will not be consumable before the upstream is finished.
> > > Without it, when the failure occurs in the upstream region, there is
> > > still possible to have a resource deadlock. I don't know whether it is
> > > an explicit protocol now. But after this FLIP, I think it should not
> > > be broken.
> > > I'm also wondering could we execute the upstream and downstream
> > > regions at the same time if we have enough resources. It can shorten
> > > the running time of large job. We should not break the protocol of
> > > blocking edge. But if it is possible to change the data exchange mode
> > > of two regions dynamically?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> > > >
> > > > Thanks for reporting this Yangze.
> > > > I have update the permission to those images. Everyone are able to
> view
> > > them now.
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > >>
> > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > >>
> > > >> I found that the image link in this FLIP is not working well. When I
> > > >> open that link, Google doc told me that I have no access privilege.
> > > >> Could you take a look at that issue?
> > > >>
> > > >> Best,
> > > >> Yangze Guo
> > > >>
> > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> > > >> >
> > > >> > Hi community,
> > > >> >
> > > >> > In the past releases, we have been working on refactoring Flink's
> > > scheduler
> > > >> > with the goal of making the scheduler extensible [1]. We have
> rolled
> > > out
> > > >> > most of the intended refactoring in Flink 1.10, and we think it is
> > > now time
> > > >> > to leverage our newly introduced abstractions to implement a new
> > > resource
> > > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > > >> >
> > > >> > This scheduling strategy aims at:
> > > >> >
> > > >> >     * avoidance of resource deadlocks when running batch jobs
> > > >> >
> > > >> >     * tunable with respect to resource consumption and throughput
> > > >> >
> > > >> > More details can be found in the Wiki [2]. We are looking forward
> to
> > > your
> > > >> > feedback.
> > > >> >
> > > >> > Best,
> > > >> >
> > > >> > Zhu Zhu & Gary
> > > >> >
> > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > >> >
> > > >> > [2]
> > > >> >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Zhu Zhu
Thanks for the comments!

To Xintong,
It's a bit strange since the in page links work as expected. Would you take
another try?

To Till,
- Regarding the idea to improve to SlotProvider interface
I think it is a good idea and thanks a lot! In the current design we make
slot requests for batch jobs to wait for resources without timeout as long
as the JM see enough slots overall. This implicitly add assumption that
tasks can finish and slots are be returned. This, however, would not work
in the mixed bounded/unbounded workloads as you mentioned.
Your idea looks more clear that it always allow slot allocations to wait
and not time out as long as it see enough slots. And the 'enough' check is
with regard to slots that can be returned (for bounded tasks) and slots
that will be occupied forever (for unbounded tasks), so that streaming jobs
can naturally throw slot allocation timeout errors if the cluster does not
have enough resources for all the tasks to run at the same time.
I will take a deeper thought to see how we can implement it this way.

- Regarding the idea to solve "Resource deadlocks when slot allocation
competition happens between multiple jobs in a session cluster"
Agreed it's also possible to let the RM to revoke the slots to unblock the
oldest bulk of requests first. That would require some extra work to change
the RM to holds the requests before it is sure the slots are successfully
assigned to the JM (currently the RM removes pending requests right after
the requests are sent to TM without confirming wether the slot offers
succeed). We can look deeper into it later when we are about to support
variant sizes slots.

Thanks,
Zhu Zhu


Till Rohrmann <[hidden email]> 于2020年3月27日周五 下午10:59写道:

> Thanks for creating this FLIP Zhu Zhu and Gary!
>
> +1 for adding pipelined region scheduling.
>
> Concerning the extended SlotProvider interface I have an idea how we could
> further improve it. If I am not mistaken, then you have proposed to
> introduce the two timeouts in order to distinguish between batch and
> streaming jobs and to encode that batch job requests can wait if there are
> enough resources in the SlotPool (not necessarily being available right
> now). I think what we actually need to tell the SlotProvider is whether a
> request will use the slot only for a limited time or not. This is exactly
> the difference between processing bounded and unbounded streams. If the
> SlotProvider knows this difference, then it can tell which slots will
> eventually be reusable and which not. Based on this it can tell whether a
> slot request can be fulfilled eventually or whether we fail after the
> specified timeout. Another benefit of this approach would be that we can
> easily support mixed bounded/unbounded workloads. What we would need to
> know for this approach is whether a pipelined region is processing a
> bounded or unbounded stream.
>
> To give an example let's assume we request the following sets of slots
> where each pipelined region requires the same slots:
>
> slotProvider.allocateSlots(pr1_bounded, timeout);
> slotProvider.allocateSlots(pr2_unbounded, timeout);
> slotProvider.allocateSlots(pr3_bounded, timeout);
>
> Let's assume we receive slots for pr1_bounded in < timeout and can then
> fulfill the request. Then we request pr2_unbounded. Since we know that
> pr1_bounded will complete eventually, we don't fail this request after
> timeout. Next we request pr3_bounded after pr2_unbounded has been
> completed. In this case, we see that we need to request new resources
> because pr2_unbounded won't release its slots. Hence, if we cannot allocate
> new resources within timeout, we fail this request.
>
> A small comment concerning "Resource deadlocks when slot allocation
> competition happens between multiple jobs in a session cluster": Another
> idea to solve this situation would be to give the ResourceManager the right
> to revoke slot assignments in order to change the mapping between requests
> and available slots.
>
> Cheers,
> Till
>
> On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]>
> wrote:
>
> > Gary & Zhu Zhu,
> >
> > Thanks for preparing this FLIP, and a BIG +1 from my side. The trade-off
> > between resource utilization and potential deadlock problems has always
> > been a pain. Despite not solving all the deadlock cases, this FLIP is
> > definitely a big improvement. IIUC, it has already covered all the
> existing
> > single job cases, and all the mentioned non-covered cases are either in
> > multi-job session clusters or with diverse slot resources in future.
> >
> > I've read through the FLIP, and it looks really good to me. Good job! All
> > the concerns and limitations that I can think of have already been
> clearly
> > stated, with reasonable potential future solutions. From the perspective
> of
> > fine-grained resource management, I do not see any serious/irresolvable
> > conflict at this time.
> >
> > nit: The in-page links are not working. I guess those are copied from
> > google docs directly?
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
> >
> > > To Yangze,
> > >
> > > >> the blocking edge will not be consumable before the upstream is
> > > finished.
> > > Yes. This is how we define a BLOCKING result partition, "Blocking
> > > partitions represent blocking data exchanges, where the data stream is
> > > first fully produced and then consumed".
> > >
> > > >> I'm also wondering could we execute the upstream and downstream
> > regions
> > > at the same time if we have enough resources
> > > It may lead to resource waste since the tasks in downstream regions
> > cannot
> > > read any data before the upstream region finishes. It saves a bit time
> on
> > > schedule, but usually it does not make much difference for large jobs,
> > > since data processing takes much more time. For small jobs, one can
> make
> > > all edges PIPELINED so that all the tasks can be scheduled at the same
> > > time.
> > >
> > > >> is it possible to change the data exchange mode of two regions
> > > dynamically?
> > > This is not in the scope of the FLIP. But we are moving forward to a
> more
> > > extensible scheduler (FLINK-10429) and resource aware scheduling
> > > (FLINK-10407).
> > > So I think it's possible we can have a scheduler in the future which
> > > dynamically changes the shuffle type wisely regarding available
> > resources.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> > >
> > > > Thanks for updating!
> > > >
> > > > +1 for supporting the pipelined region scheduling. Although we could
> > > > not prevent resource deadlock in all scenarios, it is really a big
> > > > step.
> > > >
> > > > The design generally LGTM.
> > > >
> > > > One minor thing I want to make sure. If I understand correctly, the
> > > > blocking edge will not be consumable before the upstream is finished.
> > > > Without it, when the failure occurs in the upstream region, there is
> > > > still possible to have a resource deadlock. I don't know whether it
> is
> > > > an explicit protocol now. But after this FLIP, I think it should not
> > > > be broken.
> > > > I'm also wondering could we execute the upstream and downstream
> > > > regions at the same time if we have enough resources. It can shorten
> > > > the running time of large job. We should not break the protocol of
> > > > blocking edge. But if it is possible to change the data exchange mode
> > > > of two regions dynamically?
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> > > > >
> > > > > Thanks for reporting this Yangze.
> > > > > I have update the permission to those images. Everyone are able to
> > view
> > > > them now.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > > >>
> > > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > > >>
> > > > >> I found that the image link in this FLIP is not working well.
> When I
> > > > >> open that link, Google doc told me that I have no access
> privilege.
> > > > >> Could you take a look at that issue?
> > > > >>
> > > > >> Best,
> > > > >> Yangze Guo
> > > > >>
> > > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> > > > >> >
> > > > >> > Hi community,
> > > > >> >
> > > > >> > In the past releases, we have been working on refactoring
> Flink's
> > > > scheduler
> > > > >> > with the goal of making the scheduler extensible [1]. We have
> > rolled
> > > > out
> > > > >> > most of the intended refactoring in Flink 1.10, and we think it
> is
> > > > now time
> > > > >> > to leverage our newly introduced abstractions to implement a new
> > > > resource
> > > > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > > > >> >
> > > > >> > This scheduling strategy aims at:
> > > > >> >
> > > > >> >     * avoidance of resource deadlocks when running batch jobs
> > > > >> >
> > > > >> >     * tunable with respect to resource consumption and
> throughput
> > > > >> >
> > > > >> > More details can be found in the Wiki [2]. We are looking
> forward
> > to
> > > > your
> > > > >> > feedback.
> > > > >> >
> > > > >> > Best,
> > > > >> >
> > > > >> > Zhu Zhu & Gary
> > > > >> >
> > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > > >> >
> > > > >> > [2]
> > > > >> >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Xintong Song
@ZhuZhu

The links work for me now. Someone might have fixed them. Never mind.

Thank you~

Xintong Song



On Mon, Mar 30, 2020 at 1:31 AM Zhu Zhu <[hidden email]> wrote:

> Thanks for the comments!
>
> To Xintong,
> It's a bit strange since the in page links work as expected. Would you take
> another try?
>
> To Till,
> - Regarding the idea to improve to SlotProvider interface
> I think it is a good idea and thanks a lot! In the current design we make
> slot requests for batch jobs to wait for resources without timeout as long
> as the JM see enough slots overall. This implicitly add assumption that
> tasks can finish and slots are be returned. This, however, would not work
> in the mixed bounded/unbounded workloads as you mentioned.
> Your idea looks more clear that it always allow slot allocations to wait
> and not time out as long as it see enough slots. And the 'enough' check is
> with regard to slots that can be returned (for bounded tasks) and slots
> that will be occupied forever (for unbounded tasks), so that streaming jobs
> can naturally throw slot allocation timeout errors if the cluster does not
> have enough resources for all the tasks to run at the same time.
> I will take a deeper thought to see how we can implement it this way.
>
> - Regarding the idea to solve "Resource deadlocks when slot allocation
> competition happens between multiple jobs in a session cluster"
> Agreed it's also possible to let the RM to revoke the slots to unblock the
> oldest bulk of requests first. That would require some extra work to change
> the RM to holds the requests before it is sure the slots are successfully
> assigned to the JM (currently the RM removes pending requests right after
> the requests are sent to TM without confirming wether the slot offers
> succeed). We can look deeper into it later when we are about to support
> variant sizes slots.
>
> Thanks,
> Zhu Zhu
>
>
> Till Rohrmann <[hidden email]> 于2020年3月27日周五 下午10:59写道:
>
> > Thanks for creating this FLIP Zhu Zhu and Gary!
> >
> > +1 for adding pipelined region scheduling.
> >
> > Concerning the extended SlotProvider interface I have an idea how we
> could
> > further improve it. If I am not mistaken, then you have proposed to
> > introduce the two timeouts in order to distinguish between batch and
> > streaming jobs and to encode that batch job requests can wait if there
> are
> > enough resources in the SlotPool (not necessarily being available right
> > now). I think what we actually need to tell the SlotProvider is whether a
> > request will use the slot only for a limited time or not. This is exactly
> > the difference between processing bounded and unbounded streams. If the
> > SlotProvider knows this difference, then it can tell which slots will
> > eventually be reusable and which not. Based on this it can tell whether a
> > slot request can be fulfilled eventually or whether we fail after the
> > specified timeout. Another benefit of this approach would be that we can
> > easily support mixed bounded/unbounded workloads. What we would need to
> > know for this approach is whether a pipelined region is processing a
> > bounded or unbounded stream.
> >
> > To give an example let's assume we request the following sets of slots
> > where each pipelined region requires the same slots:
> >
> > slotProvider.allocateSlots(pr1_bounded, timeout);
> > slotProvider.allocateSlots(pr2_unbounded, timeout);
> > slotProvider.allocateSlots(pr3_bounded, timeout);
> >
> > Let's assume we receive slots for pr1_bounded in < timeout and can then
> > fulfill the request. Then we request pr2_unbounded. Since we know that
> > pr1_bounded will complete eventually, we don't fail this request after
> > timeout. Next we request pr3_bounded after pr2_unbounded has been
> > completed. In this case, we see that we need to request new resources
> > because pr2_unbounded won't release its slots. Hence, if we cannot
> allocate
> > new resources within timeout, we fail this request.
> >
> > A small comment concerning "Resource deadlocks when slot allocation
> > competition happens between multiple jobs in a session cluster": Another
> > idea to solve this situation would be to give the ResourceManager the
> right
> > to revoke slot assignments in order to change the mapping between
> requests
> > and available slots.
> >
> > Cheers,
> > Till
> >
> > On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]>
> > wrote:
> >
> > > Gary & Zhu Zhu,
> > >
> > > Thanks for preparing this FLIP, and a BIG +1 from my side. The
> trade-off
> > > between resource utilization and potential deadlock problems has always
> > > been a pain. Despite not solving all the deadlock cases, this FLIP is
> > > definitely a big improvement. IIUC, it has already covered all the
> > existing
> > > single job cases, and all the mentioned non-covered cases are either in
> > > multi-job session clusters or with diverse slot resources in future.
> > >
> > > I've read through the FLIP, and it looks really good to me. Good job!
> All
> > > the concerns and limitations that I can think of have already been
> > clearly
> > > stated, with reasonable potential future solutions. From the
> perspective
> > of
> > > fine-grained resource management, I do not see any serious/irresolvable
> > > conflict at this time.
> > >
> > > nit: The in-page links are not working. I guess those are copied from
> > > google docs directly?
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
> > >
> > > > To Yangze,
> > > >
> > > > >> the blocking edge will not be consumable before the upstream is
> > > > finished.
> > > > Yes. This is how we define a BLOCKING result partition, "Blocking
> > > > partitions represent blocking data exchanges, where the data stream
> is
> > > > first fully produced and then consumed".
> > > >
> > > > >> I'm also wondering could we execute the upstream and downstream
> > > regions
> > > > at the same time if we have enough resources
> > > > It may lead to resource waste since the tasks in downstream regions
> > > cannot
> > > > read any data before the upstream region finishes. It saves a bit
> time
> > on
> > > > schedule, but usually it does not make much difference for large
> jobs,
> > > > since data processing takes much more time. For small jobs, one can
> > make
> > > > all edges PIPELINED so that all the tasks can be scheduled at the
> same
> > > > time.
> > > >
> > > > >> is it possible to change the data exchange mode of two regions
> > > > dynamically?
> > > > This is not in the scope of the FLIP. But we are moving forward to a
> > more
> > > > extensible scheduler (FLINK-10429) and resource aware scheduling
> > > > (FLINK-10407).
> > > > So I think it's possible we can have a scheduler in the future which
> > > > dynamically changes the shuffle type wisely regarding available
> > > resources.
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> > > >
> > > > > Thanks for updating!
> > > > >
> > > > > +1 for supporting the pipelined region scheduling. Although we
> could
> > > > > not prevent resource deadlock in all scenarios, it is really a big
> > > > > step.
> > > > >
> > > > > The design generally LGTM.
> > > > >
> > > > > One minor thing I want to make sure. If I understand correctly, the
> > > > > blocking edge will not be consumable before the upstream is
> finished.
> > > > > Without it, when the failure occurs in the upstream region, there
> is
> > > > > still possible to have a resource deadlock. I don't know whether it
> > is
> > > > > an explicit protocol now. But after this FLIP, I think it should
> not
> > > > > be broken.
> > > > > I'm also wondering could we execute the upstream and downstream
> > > > > regions at the same time if we have enough resources. It can
> shorten
> > > > > the running time of large job. We should not break the protocol of
> > > > > blocking edge. But if it is possible to change the data exchange
> mode
> > > > > of two regions dynamically?
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> > > > > >
> > > > > > Thanks for reporting this Yangze.
> > > > > > I have update the permission to those images. Everyone are able
> to
> > > view
> > > > > them now.
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu Zhu
> > > > > >
> > > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > > > >>
> > > > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > > > >>
> > > > > >> I found that the image link in this FLIP is not working well.
> > When I
> > > > > >> open that link, Google doc told me that I have no access
> > privilege.
> > > > > >> Could you take a look at that issue?
> > > > > >>
> > > > > >> Best,
> > > > > >> Yangze Guo
> > > > > >>
> > > > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]>
> wrote:
> > > > > >> >
> > > > > >> > Hi community,
> > > > > >> >
> > > > > >> > In the past releases, we have been working on refactoring
> > Flink's
> > > > > scheduler
> > > > > >> > with the goal of making the scheduler extensible [1]. We have
> > > rolled
> > > > > out
> > > > > >> > most of the intended refactoring in Flink 1.10, and we think
> it
> > is
> > > > > now time
> > > > > >> > to leverage our newly introduced abstractions to implement a
> new
> > > > > resource
> > > > > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > > > > >> >
> > > > > >> > This scheduling strategy aims at:
> > > > > >> >
> > > > > >> >     * avoidance of resource deadlocks when running batch jobs
> > > > > >> >
> > > > > >> >     * tunable with respect to resource consumption and
> > throughput
> > > > > >> >
> > > > > >> > More details can be found in the Wiki [2]. We are looking
> > forward
> > > to
> > > > > your
> > > > > >> > feedback.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> >
> > > > > >> > Zhu Zhu & Gary
> > > > > >> >
> > > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > > > >> >
> > > > > >> > [2]
> > > > > >> >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Yangze Guo
In reply to this post by Zhu Zhu
Hi, Zhu Zhu,

Thanks for the explanation and information. I also agree that it out
of the scope of this FLIP and glad to see that we are moving forward
to a more extensible scheduler!

Best,
Yangze Guo

On Mon, Mar 30, 2020 at 1:31 AM Zhu Zhu <[hidden email]> wrote:

>
> Thanks for the comments!
>
> To Xintong,
> It's a bit strange since the in page links work as expected. Would you take
> another try?
>
> To Till,
> - Regarding the idea to improve to SlotProvider interface
> I think it is a good idea and thanks a lot! In the current design we make
> slot requests for batch jobs to wait for resources without timeout as long
> as the JM see enough slots overall. This implicitly add assumption that
> tasks can finish and slots are be returned. This, however, would not work
> in the mixed bounded/unbounded workloads as you mentioned.
> Your idea looks more clear that it always allow slot allocations to wait
> and not time out as long as it see enough slots. And the 'enough' check is
> with regard to slots that can be returned (for bounded tasks) and slots
> that will be occupied forever (for unbounded tasks), so that streaming jobs
> can naturally throw slot allocation timeout errors if the cluster does not
> have enough resources for all the tasks to run at the same time.
> I will take a deeper thought to see how we can implement it this way.
>
> - Regarding the idea to solve "Resource deadlocks when slot allocation
> competition happens between multiple jobs in a session cluster"
> Agreed it's also possible to let the RM to revoke the slots to unblock the
> oldest bulk of requests first. That would require some extra work to change
> the RM to holds the requests before it is sure the slots are successfully
> assigned to the JM (currently the RM removes pending requests right after
> the requests are sent to TM without confirming wether the slot offers
> succeed). We can look deeper into it later when we are about to support
> variant sizes slots.
>
> Thanks,
> Zhu Zhu
>
>
> Till Rohrmann <[hidden email]> 于2020年3月27日周五 下午10:59写道:
>
> > Thanks for creating this FLIP Zhu Zhu and Gary!
> >
> > +1 for adding pipelined region scheduling.
> >
> > Concerning the extended SlotProvider interface I have an idea how we could
> > further improve it. If I am not mistaken, then you have proposed to
> > introduce the two timeouts in order to distinguish between batch and
> > streaming jobs and to encode that batch job requests can wait if there are
> > enough resources in the SlotPool (not necessarily being available right
> > now). I think what we actually need to tell the SlotProvider is whether a
> > request will use the slot only for a limited time or not. This is exactly
> > the difference between processing bounded and unbounded streams. If the
> > SlotProvider knows this difference, then it can tell which slots will
> > eventually be reusable and which not. Based on this it can tell whether a
> > slot request can be fulfilled eventually or whether we fail after the
> > specified timeout. Another benefit of this approach would be that we can
> > easily support mixed bounded/unbounded workloads. What we would need to
> > know for this approach is whether a pipelined region is processing a
> > bounded or unbounded stream.
> >
> > To give an example let's assume we request the following sets of slots
> > where each pipelined region requires the same slots:
> >
> > slotProvider.allocateSlots(pr1_bounded, timeout);
> > slotProvider.allocateSlots(pr2_unbounded, timeout);
> > slotProvider.allocateSlots(pr3_bounded, timeout);
> >
> > Let's assume we receive slots for pr1_bounded in < timeout and can then
> > fulfill the request. Then we request pr2_unbounded. Since we know that
> > pr1_bounded will complete eventually, we don't fail this request after
> > timeout. Next we request pr3_bounded after pr2_unbounded has been
> > completed. In this case, we see that we need to request new resources
> > because pr2_unbounded won't release its slots. Hence, if we cannot allocate
> > new resources within timeout, we fail this request.
> >
> > A small comment concerning "Resource deadlocks when slot allocation
> > competition happens between multiple jobs in a session cluster": Another
> > idea to solve this situation would be to give the ResourceManager the right
> > to revoke slot assignments in order to change the mapping between requests
> > and available slots.
> >
> > Cheers,
> > Till
> >
> > On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]>
> > wrote:
> >
> > > Gary & Zhu Zhu,
> > >
> > > Thanks for preparing this FLIP, and a BIG +1 from my side. The trade-off
> > > between resource utilization and potential deadlock problems has always
> > > been a pain. Despite not solving all the deadlock cases, this FLIP is
> > > definitely a big improvement. IIUC, it has already covered all the
> > existing
> > > single job cases, and all the mentioned non-covered cases are either in
> > > multi-job session clusters or with diverse slot resources in future.
> > >
> > > I've read through the FLIP, and it looks really good to me. Good job! All
> > > the concerns and limitations that I can think of have already been
> > clearly
> > > stated, with reasonable potential future solutions. From the perspective
> > of
> > > fine-grained resource management, I do not see any serious/irresolvable
> > > conflict at this time.
> > >
> > > nit: The in-page links are not working. I guess those are copied from
> > > google docs directly?
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
> > >
> > > > To Yangze,
> > > >
> > > > >> the blocking edge will not be consumable before the upstream is
> > > > finished.
> > > > Yes. This is how we define a BLOCKING result partition, "Blocking
> > > > partitions represent blocking data exchanges, where the data stream is
> > > > first fully produced and then consumed".
> > > >
> > > > >> I'm also wondering could we execute the upstream and downstream
> > > regions
> > > > at the same time if we have enough resources
> > > > It may lead to resource waste since the tasks in downstream regions
> > > cannot
> > > > read any data before the upstream region finishes. It saves a bit time
> > on
> > > > schedule, but usually it does not make much difference for large jobs,
> > > > since data processing takes much more time. For small jobs, one can
> > make
> > > > all edges PIPELINED so that all the tasks can be scheduled at the same
> > > > time.
> > > >
> > > > >> is it possible to change the data exchange mode of two regions
> > > > dynamically?
> > > > This is not in the scope of the FLIP. But we are moving forward to a
> > more
> > > > extensible scheduler (FLINK-10429) and resource aware scheduling
> > > > (FLINK-10407).
> > > > So I think it's possible we can have a scheduler in the future which
> > > > dynamically changes the shuffle type wisely regarding available
> > > resources.
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> > > >
> > > > > Thanks for updating!
> > > > >
> > > > > +1 for supporting the pipelined region scheduling. Although we could
> > > > > not prevent resource deadlock in all scenarios, it is really a big
> > > > > step.
> > > > >
> > > > > The design generally LGTM.
> > > > >
> > > > > One minor thing I want to make sure. If I understand correctly, the
> > > > > blocking edge will not be consumable before the upstream is finished.
> > > > > Without it, when the failure occurs in the upstream region, there is
> > > > > still possible to have a resource deadlock. I don't know whether it
> > is
> > > > > an explicit protocol now. But after this FLIP, I think it should not
> > > > > be broken.
> > > > > I'm also wondering could we execute the upstream and downstream
> > > > > regions at the same time if we have enough resources. It can shorten
> > > > > the running time of large job. We should not break the protocol of
> > > > > blocking edge. But if it is possible to change the data exchange mode
> > > > > of two regions dynamically?
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]> wrote:
> > > > > >
> > > > > > Thanks for reporting this Yangze.
> > > > > > I have update the permission to those images. Everyone are able to
> > > view
> > > > > them now.
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu Zhu
> > > > > >
> > > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > > > >>
> > > > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > > > >>
> > > > > >> I found that the image link in this FLIP is not working well.
> > When I
> > > > > >> open that link, Google doc told me that I have no access
> > privilege.
> > > > > >> Could you take a look at that issue?
> > > > > >>
> > > > > >> Best,
> > > > > >> Yangze Guo
> > > > > >>
> > > > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]> wrote:
> > > > > >> >
> > > > > >> > Hi community,
> > > > > >> >
> > > > > >> > In the past releases, we have been working on refactoring
> > Flink's
> > > > > scheduler
> > > > > >> > with the goal of making the scheduler extensible [1]. We have
> > > rolled
> > > > > out
> > > > > >> > most of the intended refactoring in Flink 1.10, and we think it
> > is
> > > > > now time
> > > > > >> > to leverage our newly introduced abstractions to implement a new
> > > > > resource
> > > > > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > > > > >> >
> > > > > >> > This scheduling strategy aims at:
> > > > > >> >
> > > > > >> >     * avoidance of resource deadlocks when running batch jobs
> > > > > >> >
> > > > > >> >     * tunable with respect to resource consumption and
> > throughput
> > > > > >> >
> > > > > >> > More details can be found in the Wiki [2]. We are looking
> > forward
> > > to
> > > > > your
> > > > > >> > feedback.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> >
> > > > > >> > Zhu Zhu & Gary
> > > > > >> >
> > > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > > > >> >
> > > > > >> > [2]
> > > > > >> >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > >
> > > >
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Gary Yao-4
In reply to this post by Xintong Song
>
> The links work for me now. Someone might have fixed them. Never mind.
>

Actually, I fixed the links after seeing your email. Thanks for reporting.

Best,
Gary

On Mon, Mar 30, 2020 at 3:48 AM Xintong Song <[hidden email]> wrote:

> @ZhuZhu
>
> The links work for me now. Someone might have fixed them. Never mind.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 30, 2020 at 1:31 AM Zhu Zhu <[hidden email]> wrote:
>
> > Thanks for the comments!
> >
> > To Xintong,
> > It's a bit strange since the in page links work as expected. Would you
> take
> > another try?
> >
> > To Till,
> > - Regarding the idea to improve to SlotProvider interface
> > I think it is a good idea and thanks a lot! In the current design we make
> > slot requests for batch jobs to wait for resources without timeout as
> long
> > as the JM see enough slots overall. This implicitly add assumption that
> > tasks can finish and slots are be returned. This, however, would not work
> > in the mixed bounded/unbounded workloads as you mentioned.
> > Your idea looks more clear that it always allow slot allocations to wait
> > and not time out as long as it see enough slots. And the 'enough' check
> is
> > with regard to slots that can be returned (for bounded tasks) and slots
> > that will be occupied forever (for unbounded tasks), so that streaming
> jobs
> > can naturally throw slot allocation timeout errors if the cluster does
> not
> > have enough resources for all the tasks to run at the same time.
> > I will take a deeper thought to see how we can implement it this way.
> >
> > - Regarding the idea to solve "Resource deadlocks when slot allocation
> > competition happens between multiple jobs in a session cluster"
> > Agreed it's also possible to let the RM to revoke the slots to unblock
> the
> > oldest bulk of requests first. That would require some extra work to
> change
> > the RM to holds the requests before it is sure the slots are successfully
> > assigned to the JM (currently the RM removes pending requests right after
> > the requests are sent to TM without confirming wether the slot offers
> > succeed). We can look deeper into it later when we are about to support
> > variant sizes slots.
> >
> > Thanks,
> > Zhu Zhu
> >
> >
> > Till Rohrmann <[hidden email]> 于2020年3月27日周五 下午10:59写道:
> >
> > > Thanks for creating this FLIP Zhu Zhu and Gary!
> > >
> > > +1 for adding pipelined region scheduling.
> > >
> > > Concerning the extended SlotProvider interface I have an idea how we
> > could
> > > further improve it. If I am not mistaken, then you have proposed to
> > > introduce the two timeouts in order to distinguish between batch and
> > > streaming jobs and to encode that batch job requests can wait if there
> > are
> > > enough resources in the SlotPool (not necessarily being available right
> > > now). I think what we actually need to tell the SlotProvider is
> whether a
> > > request will use the slot only for a limited time or not. This is
> exactly
> > > the difference between processing bounded and unbounded streams. If the
> > > SlotProvider knows this difference, then it can tell which slots will
> > > eventually be reusable and which not. Based on this it can tell
> whether a
> > > slot request can be fulfilled eventually or whether we fail after the
> > > specified timeout. Another benefit of this approach would be that we
> can
> > > easily support mixed bounded/unbounded workloads. What we would need to
> > > know for this approach is whether a pipelined region is processing a
> > > bounded or unbounded stream.
> > >
> > > To give an example let's assume we request the following sets of slots
> > > where each pipelined region requires the same slots:
> > >
> > > slotProvider.allocateSlots(pr1_bounded, timeout);
> > > slotProvider.allocateSlots(pr2_unbounded, timeout);
> > > slotProvider.allocateSlots(pr3_bounded, timeout);
> > >
> > > Let's assume we receive slots for pr1_bounded in < timeout and can then
> > > fulfill the request. Then we request pr2_unbounded. Since we know that
> > > pr1_bounded will complete eventually, we don't fail this request after
> > > timeout. Next we request pr3_bounded after pr2_unbounded has been
> > > completed. In this case, we see that we need to request new resources
> > > because pr2_unbounded won't release its slots. Hence, if we cannot
> > allocate
> > > new resources within timeout, we fail this request.
> > >
> > > A small comment concerning "Resource deadlocks when slot allocation
> > > competition happens between multiple jobs in a session cluster":
> Another
> > > idea to solve this situation would be to give the ResourceManager the
> > right
> > > to revoke slot assignments in order to change the mapping between
> > requests
> > > and available slots.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]>
> > > wrote:
> > >
> > > > Gary & Zhu Zhu,
> > > >
> > > > Thanks for preparing this FLIP, and a BIG +1 from my side. The
> > trade-off
> > > > between resource utilization and potential deadlock problems has
> always
> > > > been a pain. Despite not solving all the deadlock cases, this FLIP is
> > > > definitely a big improvement. IIUC, it has already covered all the
> > > existing
> > > > single job cases, and all the mentioned non-covered cases are either
> in
> > > > multi-job session clusters or with diverse slot resources in future.
> > > >
> > > > I've read through the FLIP, and it looks really good to me. Good job!
> > All
> > > > the concerns and limitations that I can think of have already been
> > > clearly
> > > > stated, with reasonable potential future solutions. From the
> > perspective
> > > of
> > > > fine-grained resource management, I do not see any
> serious/irresolvable
> > > > conflict at this time.
> > > >
> > > > nit: The in-page links are not working. I guess those are copied from
> > > > google docs directly?
> > > >
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
> > > >
> > > > > To Yangze,
> > > > >
> > > > > >> the blocking edge will not be consumable before the upstream is
> > > > > finished.
> > > > > Yes. This is how we define a BLOCKING result partition, "Blocking
> > > > > partitions represent blocking data exchanges, where the data stream
> > is
> > > > > first fully produced and then consumed".
> > > > >
> > > > > >> I'm also wondering could we execute the upstream and downstream
> > > > regions
> > > > > at the same time if we have enough resources
> > > > > It may lead to resource waste since the tasks in downstream regions
> > > > cannot
> > > > > read any data before the upstream region finishes. It saves a bit
> > time
> > > on
> > > > > schedule, but usually it does not make much difference for large
> > jobs,
> > > > > since data processing takes much more time. For small jobs, one can
> > > make
> > > > > all edges PIPELINED so that all the tasks can be scheduled at the
> > same
> > > > > time.
> > > > >
> > > > > >> is it possible to change the data exchange mode of two regions
> > > > > dynamically?
> > > > > This is not in the scope of the FLIP. But we are moving forward to
> a
> > > more
> > > > > extensible scheduler (FLINK-10429) and resource aware scheduling
> > > > > (FLINK-10407).
> > > > > So I think it's possible we can have a scheduler in the future
> which
> > > > > dynamically changes the shuffle type wisely regarding available
> > > > resources.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> > > > >
> > > > > > Thanks for updating!
> > > > > >
> > > > > > +1 for supporting the pipelined region scheduling. Although we
> > could
> > > > > > not prevent resource deadlock in all scenarios, it is really a
> big
> > > > > > step.
> > > > > >
> > > > > > The design generally LGTM.
> > > > > >
> > > > > > One minor thing I want to make sure. If I understand correctly,
> the
> > > > > > blocking edge will not be consumable before the upstream is
> > finished.
> > > > > > Without it, when the failure occurs in the upstream region, there
> > is
> > > > > > still possible to have a resource deadlock. I don't know whether
> it
> > > is
> > > > > > an explicit protocol now. But after this FLIP, I think it should
> > not
> > > > > > be broken.
> > > > > > I'm also wondering could we execute the upstream and downstream
> > > > > > regions at the same time if we have enough resources. It can
> > shorten
> > > > > > the running time of large job. We should not break the protocol
> of
> > > > > > blocking edge. But if it is possible to change the data exchange
> > mode
> > > > > > of two regions dynamically?
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]>
> wrote:
> > > > > > >
> > > > > > > Thanks for reporting this Yangze.
> > > > > > > I have update the permission to those images. Everyone are able
> > to
> > > > view
> > > > > > them now.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Zhu Zhu
> > > > > > >
> > > > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > > > > >>
> > > > > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > > > > >>
> > > > > > >> I found that the image link in this FLIP is not working well.
> > > When I
> > > > > > >> open that link, Google doc told me that I have no access
> > > privilege.
> > > > > > >> Could you take a look at that issue?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Yangze Guo
> > > > > > >>
> > > > > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]>
> > wrote:
> > > > > > >> >
> > > > > > >> > Hi community,
> > > > > > >> >
> > > > > > >> > In the past releases, we have been working on refactoring
> > > Flink's
> > > > > > scheduler
> > > > > > >> > with the goal of making the scheduler extensible [1]. We
> have
> > > > rolled
> > > > > > out
> > > > > > >> > most of the intended refactoring in Flink 1.10, and we think
> > it
> > > is
> > > > > > now time
> > > > > > >> > to leverage our newly introduced abstractions to implement a
> > new
> > > > > > resource
> > > > > > >> > optimized scheduling strategy: Pipelined Region Scheduling.
> > > > > > >> >
> > > > > > >> > This scheduling strategy aims at:
> > > > > > >> >
> > > > > > >> >     * avoidance of resource deadlocks when running batch
> jobs
> > > > > > >> >
> > > > > > >> >     * tunable with respect to resource consumption and
> > > throughput
> > > > > > >> >
> > > > > > >> > More details can be found in the Wiki [2]. We are looking
> > > forward
> > > > to
> > > > > > your
> > > > > > >> > feedback.
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> >
> > > > > > >> > Zhu Zhu & Gary
> > > > > > >> >
> > > > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > > > > >> >
> > > > > > >> > [2]
> > > > > > >> >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-119: Pipelined Region Scheduling

Zhu Zhu
Thanks for the nice suggestion Till.
The section 'Bulk Slot Allocation' is updated.

Thanks,
Zhu Zhu

Gary Yao <[hidden email]> 于2020年3月30日周一 下午3:38写道:

> >
> > The links work for me now. Someone might have fixed them. Never mind.
> >
>
> Actually, I fixed the links after seeing your email. Thanks for reporting.
>
> Best,
> Gary
>
> On Mon, Mar 30, 2020 at 3:48 AM Xintong Song <[hidden email]>
> wrote:
>
> > @ZhuZhu
> >
> > The links work for me now. Someone might have fixed them. Never mind.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Mar 30, 2020 at 1:31 AM Zhu Zhu <[hidden email]> wrote:
> >
> > > Thanks for the comments!
> > >
> > > To Xintong,
> > > It's a bit strange since the in page links work as expected. Would you
> > take
> > > another try?
> > >
> > > To Till,
> > > - Regarding the idea to improve to SlotProvider interface
> > > I think it is a good idea and thanks a lot! In the current design we
> make
> > > slot requests for batch jobs to wait for resources without timeout as
> > long
> > > as the JM see enough slots overall. This implicitly add assumption that
> > > tasks can finish and slots are be returned. This, however, would not
> work
> > > in the mixed bounded/unbounded workloads as you mentioned.
> > > Your idea looks more clear that it always allow slot allocations to
> wait
> > > and not time out as long as it see enough slots. And the 'enough' check
> > is
> > > with regard to slots that can be returned (for bounded tasks) and slots
> > > that will be occupied forever (for unbounded tasks), so that streaming
> > jobs
> > > can naturally throw slot allocation timeout errors if the cluster does
> > not
> > > have enough resources for all the tasks to run at the same time.
> > > I will take a deeper thought to see how we can implement it this way.
> > >
> > > - Regarding the idea to solve "Resource deadlocks when slot allocation
> > > competition happens between multiple jobs in a session cluster"
> > > Agreed it's also possible to let the RM to revoke the slots to unblock
> > the
> > > oldest bulk of requests first. That would require some extra work to
> > change
> > > the RM to holds the requests before it is sure the slots are
> successfully
> > > assigned to the JM (currently the RM removes pending requests right
> after
> > > the requests are sent to TM without confirming wether the slot offers
> > > succeed). We can look deeper into it later when we are about to support
> > > variant sizes slots.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > >
> > > Till Rohrmann <[hidden email]> 于2020年3月27日周五 下午10:59写道:
> > >
> > > > Thanks for creating this FLIP Zhu Zhu and Gary!
> > > >
> > > > +1 for adding pipelined region scheduling.
> > > >
> > > > Concerning the extended SlotProvider interface I have an idea how we
> > > could
> > > > further improve it. If I am not mistaken, then you have proposed to
> > > > introduce the two timeouts in order to distinguish between batch and
> > > > streaming jobs and to encode that batch job requests can wait if
> there
> > > are
> > > > enough resources in the SlotPool (not necessarily being available
> right
> > > > now). I think what we actually need to tell the SlotProvider is
> > whether a
> > > > request will use the slot only for a limited time or not. This is
> > exactly
> > > > the difference between processing bounded and unbounded streams. If
> the
> > > > SlotProvider knows this difference, then it can tell which slots will
> > > > eventually be reusable and which not. Based on this it can tell
> > whether a
> > > > slot request can be fulfilled eventually or whether we fail after the
> > > > specified timeout. Another benefit of this approach would be that we
> > can
> > > > easily support mixed bounded/unbounded workloads. What we would need
> to
> > > > know for this approach is whether a pipelined region is processing a
> > > > bounded or unbounded stream.
> > > >
> > > > To give an example let's assume we request the following sets of
> slots
> > > > where each pipelined region requires the same slots:
> > > >
> > > > slotProvider.allocateSlots(pr1_bounded, timeout);
> > > > slotProvider.allocateSlots(pr2_unbounded, timeout);
> > > > slotProvider.allocateSlots(pr3_bounded, timeout);
> > > >
> > > > Let's assume we receive slots for pr1_bounded in < timeout and can
> then
> > > > fulfill the request. Then we request pr2_unbounded. Since we know
> that
> > > > pr1_bounded will complete eventually, we don't fail this request
> after
> > > > timeout. Next we request pr3_bounded after pr2_unbounded has been
> > > > completed. In this case, we see that we need to request new resources
> > > > because pr2_unbounded won't release its slots. Hence, if we cannot
> > > allocate
> > > > new resources within timeout, we fail this request.
> > > >
> > > > A small comment concerning "Resource deadlocks when slot allocation
> > > > competition happens between multiple jobs in a session cluster":
> > Another
> > > > idea to solve this situation would be to give the ResourceManager the
> > > right
> > > > to revoke slot assignments in order to change the mapping between
> > > requests
> > > > and available slots.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Mar 27, 2020 at 12:44 PM Xintong Song <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > Gary & Zhu Zhu,
> > > > >
> > > > > Thanks for preparing this FLIP, and a BIG +1 from my side. The
> > > trade-off
> > > > > between resource utilization and potential deadlock problems has
> > always
> > > > > been a pain. Despite not solving all the deadlock cases, this FLIP
> is
> > > > > definitely a big improvement. IIUC, it has already covered all the
> > > > existing
> > > > > single job cases, and all the mentioned non-covered cases are
> either
> > in
> > > > > multi-job session clusters or with diverse slot resources in
> future.
> > > > >
> > > > > I've read through the FLIP, and it looks really good to me. Good
> job!
> > > All
> > > > > the concerns and limitations that I can think of have already been
> > > > clearly
> > > > > stated, with reasonable potential future solutions. From the
> > > perspective
> > > > of
> > > > > fine-grained resource management, I do not see any
> > serious/irresolvable
> > > > > conflict at this time.
> > > > >
> > > > > nit: The in-page links are not working. I guess those are copied
> from
> > > > > google docs directly?
> > > > >
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Mar 27, 2020 at 6:26 PM Zhu Zhu <[hidden email]> wrote:
> > > > >
> > > > > > To Yangze,
> > > > > >
> > > > > > >> the blocking edge will not be consumable before the upstream
> is
> > > > > > finished.
> > > > > > Yes. This is how we define a BLOCKING result partition, "Blocking
> > > > > > partitions represent blocking data exchanges, where the data
> stream
> > > is
> > > > > > first fully produced and then consumed".
> > > > > >
> > > > > > >> I'm also wondering could we execute the upstream and
> downstream
> > > > > regions
> > > > > > at the same time if we have enough resources
> > > > > > It may lead to resource waste since the tasks in downstream
> regions
> > > > > cannot
> > > > > > read any data before the upstream region finishes. It saves a bit
> > > time
> > > > on
> > > > > > schedule, but usually it does not make much difference for large
> > > jobs,
> > > > > > since data processing takes much more time. For small jobs, one
> can
> > > > make
> > > > > > all edges PIPELINED so that all the tasks can be scheduled at the
> > > same
> > > > > > time.
> > > > > >
> > > > > > >> is it possible to change the data exchange mode of two regions
> > > > > > dynamically?
> > > > > > This is not in the scope of the FLIP. But we are moving forward
> to
> > a
> > > > more
> > > > > > extensible scheduler (FLINK-10429) and resource aware scheduling
> > > > > > (FLINK-10407).
> > > > > > So I think it's possible we can have a scheduler in the future
> > which
> > > > > > dynamically changes the shuffle type wisely regarding available
> > > > > resources.
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu Zhu
> > > > > >
> > > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 下午4:49写道:
> > > > > >
> > > > > > > Thanks for updating!
> > > > > > >
> > > > > > > +1 for supporting the pipelined region scheduling. Although we
> > > could
> > > > > > > not prevent resource deadlock in all scenarios, it is really a
> > big
> > > > > > > step.
> > > > > > >
> > > > > > > The design generally LGTM.
> > > > > > >
> > > > > > > One minor thing I want to make sure. If I understand correctly,
> > the
> > > > > > > blocking edge will not be consumable before the upstream is
> > > finished.
> > > > > > > Without it, when the failure occurs in the upstream region,
> there
> > > is
> > > > > > > still possible to have a resource deadlock. I don't know
> whether
> > it
> > > > is
> > > > > > > an explicit protocol now. But after this FLIP, I think it
> should
> > > not
> > > > > > > be broken.
> > > > > > > I'm also wondering could we execute the upstream and downstream
> > > > > > > regions at the same time if we have enough resources. It can
> > > shorten
> > > > > > > the running time of large job. We should not break the protocol
> > of
> > > > > > > blocking edge. But if it is possible to change the data
> exchange
> > > mode
> > > > > > > of two regions dynamically?
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Fri, Mar 27, 2020 at 1:15 PM Zhu Zhu <[hidden email]>
> > wrote:
> > > > > > > >
> > > > > > > > Thanks for reporting this Yangze.
> > > > > > > > I have update the permission to those images. Everyone are
> able
> > > to
> > > > > view
> > > > > > > them now.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Zhu Zhu
> > > > > > > >
> > > > > > > > Yangze Guo <[hidden email]> 于2020年3月27日周五 上午11:25写道:
> > > > > > > >>
> > > > > > > >> Thanks for driving this discussion, Zhu Zhu & Gary.
> > > > > > > >>
> > > > > > > >> I found that the image link in this FLIP is not working
> well.
> > > > When I
> > > > > > > >> open that link, Google doc told me that I have no access
> > > > privilege.
> > > > > > > >> Could you take a look at that issue?
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Yangze Guo
> > > > > > > >>
> > > > > > > >> On Fri, Mar 27, 2020 at 1:38 AM Gary Yao <[hidden email]>
> > > wrote:
> > > > > > > >> >
> > > > > > > >> > Hi community,
> > > > > > > >> >
> > > > > > > >> > In the past releases, we have been working on refactoring
> > > > Flink's
> > > > > > > scheduler
> > > > > > > >> > with the goal of making the scheduler extensible [1]. We
> > have
> > > > > rolled
> > > > > > > out
> > > > > > > >> > most of the intended refactoring in Flink 1.10, and we
> think
> > > it
> > > > is
> > > > > > > now time
> > > > > > > >> > to leverage our newly introduced abstractions to
> implement a
> > > new
> > > > > > > resource
> > > > > > > >> > optimized scheduling strategy: Pipelined Region
> Scheduling.
> > > > > > > >> >
> > > > > > > >> > This scheduling strategy aims at:
> > > > > > > >> >
> > > > > > > >> >     * avoidance of resource deadlocks when running batch
> > jobs
> > > > > > > >> >
> > > > > > > >> >     * tunable with respect to resource consumption and
> > > > throughput
> > > > > > > >> >
> > > > > > > >> > More details can be found in the Wiki [2]. We are looking
> > > > forward
> > > > > to
> > > > > > > your
> > > > > > > >> > feedback.
> > > > > > > >> >
> > > > > > > >> > Best,
> > > > > > > >> >
> > > > > > > >> > Zhu Zhu & Gary
> > > > > > > >> >
> > > > > > > >> > [1] https://issues.apache.org/jira/browse/FLINK-10429
> > > > > > > >> >
> > > > > > > >> > [2]
> > > > > > > >> >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>