[DISCUSS] FLIP-67: Global partitions lifecycle

classic Classic list List threaded Threaded
25 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Zhijiang(wangzhijiang999)
Thanks for these further considerations Chesnay!

I guess we might have some misunderstanding. Actually I was not against the previous proposal Till suggested before, and I think it is a formal way to do that.

And my previous proposal was not for excluding the ShuffleService completely. The ShuffleService can be regarded as a factory for creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment on TE side.
For the ShuffleEnvironment on TE side: I do not have concerns always. The TE receives RPC call for deleting local/global partitions and then handle them via ShuffleEnvironment, just the similar way as local partitions now.
For the ShuffleMaster side: I saw some previous disuccsions on multiple ShuffleMaster instances run in different components. I was not against this way in essence, but only wonder it might bring this feature complex to consider that. So my proposal was only for excluding ShuffleMaster if possible to make implementation a bit easy. I thought there might have a somewhat PartitionTracker component in RM for tracking/deleting global partitions, just as we did the way now in JM. The partition state is reported from TE and maintained in PartitionTracker of RM, and the PartitionTracker could trigger global partition release with TE gateway directly, and not further via ShuffleMaster(it is also stateless now). And actually in existing PartitionTrackerImpl in JM, the PRC call on TE#releasePartitions is also triggered not via ShuffleMaster in some cases, and it can be regareded as a shortcut way. Of course I am also in favour of via ShuffleMaster to call the actual release partition always, and the form seems elegant.
I do not expect my inconsequential thought would block this feature ongoing and disturb your previous conclusion. Moreover, Till's recent reply already dispels my previous concern. :)

Best,
Zhijiang
------------------------------------------------------------------
From:Chesnay Schepler <[hidden email]>
Send Time:2019年10月14日(星期一) 07:00
To:dev <[hidden email]>; Till Rohrmann <[hidden email]>; zhijiang <[hidden email]>
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

I'm quite torn on whether to exclude the ShuffleServices from the
proposal. I think I'm now on my third or fourth iteration for a
response, so I'll just send both so I can stop thinking for a bit about
whether to push for one or the other:

Opinion A, aka "Nu Uh":

    I'm not in favor of excluding the shuffle master from this proposal;
    I believe it raises interesting questions that should be discussed
    beforehand; otherwise we may just end up developing ourselves into a
    corner.
    Unless there are good reasons for doing so I'd prefer to keep the
    functionality across shuffle services consistent.
    And man, my last sentence is giving me headaches (how can you
    introduce inconsistencies across shuffle services if you don't even
    touch them?..)

    Ultimately the RM only needs the ShuffleService for 2 things, which
    are fairly straight-forward:

     1. list partitions
     2. delete partitions

    Both of these are /exclusively /used via the REST APIs. In terms of
    scope I wanted this proposal to contain something that feels
    complete. If there is functionality to have a partition stick
    around, there needs to be a mechanism to delete it. Thus you also
    need a way to list them, simply for practical purposes. I do believe
    that without these this whole proposal is very much incomplete and
    would hate to see them excluded. It just /makes sense/ to have them.
    Yes, technically speak

    Could we exclude the external shuffle services from this logic?
    Sure, but I'm quite worried that we will not tackle this problem
    again for 1.10, and if we don't we end up with really inconsistent
    behavior across versions. In 1.9 you can have local state in your
    master implementation, and, bar extraordinary circumstances, will
    get a release call for partition that was registered. In 1.10 that
    last part that goes down the drain, and in 1.X the last part is back
    in play but you can't have local state anymore since another
    instance is running on the RM.

    Who is even supposed to keep up with that? It's still an interface
    that is exposed to every user. I don't think we should impose
    constraints in such a cut loose fashion.

    At last, the fact that we can implement this in a way where it works
    for some shuffle services and not others should already be quite a
    red flag. The RM maybe shouldn't do any tracking and just forward
    the heartbeat payload to the ThinShuffleMaster present on the RM.

Opinion B, aka "technically it would be fine"

    The counterpoint to the whole REST API completeness argument is that
    while the /runtime //supports /having partitions stick around, there
    is technically no way for anyone to enable such behavior at runtime.
    Hence, with no user-facing APIs to enable the feature, we don't
    necessarily need a user-facing API for management purposes, and
    could defer both to a later point where this feature is exposed
    fully to users.

    But then it's hard to justify having any communication between the
    TE and RM at all; it literally serves no purpose. The TE could just
    keep cluster partitions around until the RM disconnects. Which would
    then also raise the question what exactly of substance is left in
    this proposal.

@Till yes, the RM should work against a different interface; I don't
think anyone has argued against that. Let's put this point to rest. :)

On 13/10/2019 11:04, Till Rohrmann wrote:

> I think we won't necessarily run multiple ShuffleMasters. I think it would
> be better to pass in a leaner interface into the RM to only handle the
> deletion of the global result partitions.
>
> Letting the TEs handle the deletion of the global result partitions might
> work as long as we don't have an external shuffle service implementation.
> Hence, it could be a first step to decrease complexity but in order to
> complete this feature, I think we need to do it differently.
>
> Cheers,
> Till
>
> On Sat, Oct 12, 2019 at 7:39 AM zhijiang <[hidden email]>
> wrote:
>
>> Sorry for delay catching up with the recent progress. Thanks for the FLIP
>> update and valuable discussions!
>>
>> I also like the term of job/cluster partitions, and agree with most of the
>> previous comments.
>>
>> Only left one concern of ShuffleMaster side:
>>> However, if the separation of JM/RM into separate processes, as outlined
>> in FLIP-6, is ever fully realized it necessarily implies that multiple
>> shuffle master instances may exist for a given shuffle service.
>>
>> My previous thought was that one ShuffleService factory is for creating
>> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
>> might also need differentt ShuffleService factories.
>> And it seems that different ShuffleMaster instances could run in different
>> components based on demands, e.g. dispatcher, JM, RM.
>>
>> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
>> make things a bit easy? I mean the ShuffleMaster is still running in JM
>> component and is responsbile for job partitions. For the case of cluster
>> partitions, the RM could interact with TE directly. TE would report global
>> partitions as payloads via heartbeat with RM. And the RM could call
>> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
>> could also pass the global released partitions via payloads in heartbeat
>> with TE to reduce additional explict RPC call, but this would bring some
>> delays for releasing partition based on heartbeat interval.
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Chesnay Schepler <[hidden email]>
>> Send Time:2019年10月11日(星期五) 10:21
>> To:dev <[hidden email]>; Till Rohrmann <[hidden email]>
>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>>
>> ooooh I like job-/cluster partitions.
>>
>> On 10/10/2019 16:27, Till Rohrmann wrote:
>>> I think we should introduce a separate interface for the ResourceManager
>> so
>>> that it can list and delete global result partitions from the shuffle
>>> service implementation. As long as the JM and RM run in the same process,
>>> this interface could be implemented by the ShuffleMaster implementations.
>>> However, we should make sure that we don't introduce unnecessary
>>> concurrency. If that should be the case, then it might be simpler to have
>>> two separate components.
>>>
>>> Some ideas for the naming problem:
>>>
>>> local/global: job/cluster, intra/inter
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]>
>> wrote:
>>>> Are there any other opinions in regards to the naming scheme?
>>>> (local/global, promote)
>>>>
>>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
>>>>> Hello,
>>>>>
>>>>> FLIP-36 (interactive programming)
>>>>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>> proposes a new programming paradigm where jobs are built incrementally
>>>>> by the user.
>>>>>
>>>>> To support this in an efficient manner I propose to extend partition
>>>>> life-cycle to support the notion of /global partitions/, which are
>>>>> partitions that can exist beyond the life-time of a job.
>>>>>
>>>>> These partitions could then be re-used by subsequent jobs in a fairly
>>>>> efficient manner, as they don't have to persisted to an external
>>>>> storage first and consuming tasks could be scheduled to exploit
>>>>> data-locality.
>>>>>
>>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
>>>>> and ResourceManager to support this from a life-cycle perspective.
>>>>>
>>>>> This FLIP does /not/ concern itself with the /usage/ of global
>>>>> partitions, including client-side APIs, job-submission, scheduling and
>>>>> reading said partitions; these are all follow-ups that will either be
>>>>> part of FLIP-36 or spliced out into separate FLIPs.
>>>>>
>>>>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Chesnay Schepler-3
I have updated the FLIP.

- adopted job-/cluster partitions naming scheme
- out-lined interface for new component living in the RM (currently
called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
would be appreciated)
- added a note that the ShuffleService changes are only necessary for
external shuffle services, which could be omitted in a first version

Unless there are objections I'll start a vote thread later today.

On 14/10/2019 06:28, Zhijiang wrote:

> Thanks for these further considerations Chesnay!
>
> I guess we might have some misunderstanding. Actually I was not
> against the previous proposal Till suggested before, and I think it is
> a formal way to do that.
>
> And my previous proposal was not for excluding the ShuffleService
> completely. The ShuffleService can be regarded as a factory for
> creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> on TE side.
>
>  *
>     For the ShuffleEnvironment on TE side: I do not have concerns
>     always. The TE receives RPC call for deleting local/global
>     partitions and then handle them via ShuffleEnvironment, just the
>     similar way as local partitions now.
>  *
>     For the ShuffleMaster side: I saw some previous disuccsions on
>     multiple ShuffleMaster instances run in different components. I
>     was not against this way in essence, but only wonder it might
>     bring this feature complex to consider that. So my proposal was
>     only for excluding ShuffleMaster if possible to make
>     implementation a bit easy. I thought there might have a somewhat
>     PartitionTracker component in RM for tracking/deleting global
>     partitions, just as we did the way now in JM. The partition state
>     is reported from TE and maintained in PartitionTracker of RM, and
>     the PartitionTracker could trigger global partition release with
>     TE gateway directly, and not further via ShuffleMaster(it is also
>     stateless now). And actually in existing PartitionTrackerImpl in
>     JM, the PRC call on TE#releasePartitions is also triggered not via
>     ShuffleMaster in some cases, and it can be regareded as a shortcut
>     way. Of course I am also in favour of via ShuffleMaster to call
>     the actual release partition always, and the form seems elegant.
>
> I do not expect my inconsequential thought would block this feature
> ongoing and disturb your previous conclusion. Moreover, Till's recent
> reply already dispels my previous concern. :)
>
> Best,
> Zhijiang
>
>     ------------------------------------------------------------------
>     From:Chesnay Schepler <[hidden email]>
>     Send Time:2019年10月14日(星期一) 07:00
>     To:dev <[hidden email]>; Till Rohrmann
>     <[hidden email]>; zhijiang <[hidden email]>
>     Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>
>     I'm quite torn on whether to exclude the ShuffleServices from the
>     proposal. I think I'm now on my third or fourth iteration for a
>     response, so I'll just send both so I can stop thinking for a bit about
>
>     whether to push for one or the other:
>
>     Opinion A, aka "Nu Uh":
>
>         I'm not in favor of excluding the shuffle master from this proposal;
>         I believe it raises interesting questions that should be discussed
>         beforehand; otherwise we may just end up developing ourselves into a
>         corner.
>         Unless there are good reasons for doing so I'd prefer to keep the
>         functionality across shuffle services consistent.
>         And man, my last sentence is giving me headaches (how can you
>         introduce inconsistencies across shuffle services if you don't even
>         touch them?..)
>
>         Ultimately the RM only needs the ShuffleService for 2 things, which
>         are fairly straight-forward:
>
>          1. list partitions
>          2. delete partitions
>
>         Both of these are /exclusively /used via the REST APIs. In terms of
>         scope I wanted this proposal to contain something that feels
>         complete. If there is functionality to have a partition stick
>         around, there needs to be a mechanism to delete it. Thus you also
>         need a way to list them, simply for practical purposes. I do believe
>         that without these this whole proposal is very much incomplete and
>         would hate to see them excluded. It just /makes sense/ to have them.
>         Yes, technically speak
>
>         Could we exclude the external shuffle services from this logic?
>         Sure, but I'm quite worried that we will not tackle this problem
>         again for 1.10, and if we don't we end up with really inconsistent
>         behavior across versions. In 1.9 you can have local state in your
>         master implementation, and, bar extraordinary circumstances, will
>         get a release call for partition that was registered. In 1.10 that
>         last part that goes down the drain, and in 1.X the last part is back
>         in play but you can't have local state anymore since another
>         instance is running on the RM.
>
>         Who is even supposed to keep up with that? It's still an interface
>         that is exposed to every user. I don't think we should impose
>         constraints in such a cut loose fashion.
>
>         At last, the fact that we can implement this in a way where it works
>         for some shuffle services and not others should already be quite a
>         red flag. The RM maybe shouldn't do any tracking and just forward
>         the heartbeat payload to the ThinShuffleMaster present on the RM.
>
>     Opinion B, aka "technically it would be fine"
>
>         The counterpoint to the whole REST API completeness argument is that
>         while the /runtime //supports /having partitions stick around, there
>         is technically no way for anyone to enable such behavior at runtime.
>         Hence, with no user-facing APIs to enable the feature, we don't
>         necessarily need a user-facing API for management purposes, and
>         could defer both to a later point where this feature is exposed
>         fully to users.
>
>         But then it's hard to justify having any communication between the
>         TE and RM at all; it literally serves no purpose. The TE could just
>         keep cluster partitions around until the RM disconnects. Which would
>         then also raise the question what exactly of substance is left in
>         this proposal.
>
>     @Till yes, the RM should work against a different interface; I don't
>     think anyone has argued against that. Let's put this point to rest. :)
>
>     On 13/10/2019 11:04, Till Rohrmann wrote:
>     > I think we won't necessarily run multiple ShuffleMasters. I think it would
>     > be better to pass in a leaner interface into the RM to only handle the
>     > deletion of the global result partitions.
>     >
>     > Letting the TEs handle the deletion of the global result partitions might
>     > work as long as we don't have an external shuffle service implementation.
>     > Hence, it could be a first step to decrease complexity but in order to
>     > complete this feature, I think we need to do it differently.
>     >
>     > Cheers,
>     > Till
>     >
>     > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <[hidden email]>
>     > wrote:
>     >
>     >> Sorry for delay catching up with the recent progress. Thanks for the FLIP
>     >> update and valuable discussions!
>     >>
>     >> I also like the term of job/cluster partitions, and agree with most of the
>     >> previous comments.
>     >>
>     >> Only left one concern of ShuffleMaster side:
>     >>> However, if the separation of JM/RM into separate processes, as outlined
>     >> in FLIP-6, is ever fully realized it necessarily implies that multiple
>     >> shuffle master instances may exist for a given shuffle service.
>     >>
>     >> My previous thought was that one ShuffleService factory is for creating
>     >> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
>     >> might also need differentt ShuffleService factories.
>     >> And it seems that different ShuffleMaster instances could run in different
>     >> components based on demands, e.g. dispatcher, JM, RM.
>     >>
>     >> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
>     >> make things a bit easy? I mean the ShuffleMaster is still running in JM
>     >> component and is responsbile for job partitions. For the case of cluster
>     >> partitions, the RM could interact with TE directly. TE would report global
>     >> partitions as payloads via heartbeat with RM. And the RM could call
>     >> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
>     >> could also pass the global released partitions via payloads in heartbeat
>     >> with TE to reduce additional explict RPC call, but this would bring some
>     >> delays for releasing partition based on heartbeat interval.
>     >>
>     >> Best,
>     >> Zhijiang
>     >> ------------------------------------------------------------------
>     >> From:Chesnay Schepler <[hidden email]>
>     >> Send Time:2019年10月11日(星期五) 10:21
>     >> To:dev <[hidden email]>; Till Rohrmann <[hidden email]>
>     >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>     >>
>     >> ooooh I like job-/cluster partitions.
>     >>
>     >> On 10/10/2019 16:27, Till Rohrmann wrote:
>     >>> I think we should introduce a separate interface for the ResourceManager
>     >> so
>     >>> that it can list and delete global result partitions from the shuffle
>     >>> service implementation. As long as the JM and RM run in the same process,
>     >>> this interface could be implemented by the ShuffleMaster implementations.
>     >>> However, we should make sure that we don't introduce unnecessary
>     >>> concurrency. If that should be the case, then it might be simpler to have
>     >>> two separate components.
>     >>>
>     >>> Some ideas for the naming problem:
>     >>>
>     >>> local/global: job/cluster, intra/inter
>     >>>
>     >>> Cheers,
>     >>> Till
>     >>>
>     >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]>
>     >> wrote:
>     >>>> Are there any other opinions in regards to the naming scheme?
>     >>>> (local/global, promote)
>     >>>>
>     >>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
>     >>>>> Hello,
>     >>>>>
>     >>>>> FLIP-36 (interactive programming)
>     >>>>> <
>     >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>     >>>>> proposes a new programming paradigm where jobs are built incrementally
>     >>>>> by the user.
>     >>>>>
>     >>>>> To support this in an efficient manner I propose to extend partition
>     >>>>> life-cycle to support the notion of /global partitions/, which are
>     >>>>> partitions that can exist beyond the life-time of a job.
>     >>>>>
>     >>>>> These partitions could then be re-used by subsequent jobs in a fairly
>     >>>>> efficient manner, as they don't have to persisted to an external
>     >>>>> storage first and consuming tasks could be scheduled to exploit
>     >>>>> data-locality.
>     >>>>>
>     >>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
>     >>>>> and ResourceManager to support this from a life-cycle perspective.
>     >>>>>
>     >>>>> This FLIP does /not/ concern itself with the /usage/ of global
>     >>>>> partitions, including client-side APIs, job-submission, scheduling and
>     >>>>> reading said partitions; these are all follow-ups that will either be
>     >>>>> part of FLIP-36 or spliced out into separate FLIPs.
>     >>>>>
>     >>>>>
>     >>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Zhu Zhu
Thanks Chesnay for proposing this FLIP! And sorry for the late response on
it.
The FLIP overall looks good to me, except for one question.

- If a cluster partition does not exist in RM, how can users tell whether
it is not produced yet, or it is already released?
Users/InteractiveQuery may need this information to decide to whether to
wait or re-execute the producer job.
One way I can think of is to also check the producer job's state --
unavailable partition of a finished job means the partition is released.
But as the cluster partition is notified to RM via TM heartbeat, there can
be bad case if job is finished but the partition is not updated to RM yet.
One solution of the bad case might be that TM notifies RM instantly when
partitions are promoted, as a supplementary to the TM heartbeat way. It
also shortens the time that a consumer job waits for a cluster partition to
become available, especially for a sequence of short lived jobs. This
however introduces JM dependency on RM on job finishes, which is unwanted.


Thanks,
Zhu Zhu

Chesnay Schepler <[hidden email]> 于2019年10月15日周二 下午6:48写道:

> I have updated the FLIP.
>
> - adopted job-/cluster partitions naming scheme
> - out-lined interface for new component living in the RM (currently
> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
> would be appreciated)
> - added a note that the ShuffleService changes are only necessary for
> external shuffle services, which could be omitted in a first version
>
> Unless there are objections I'll start a vote thread later today.
>
> On 14/10/2019 06:28, Zhijiang wrote:
> > Thanks for these further considerations Chesnay!
> >
> > I guess we might have some misunderstanding. Actually I was not
> > against the previous proposal Till suggested before, and I think it is
> > a formal way to do that.
> >
> > And my previous proposal was not for excluding the ShuffleService
> > completely. The ShuffleService can be regarded as a factory for
> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> > on TE side.
> >
> >  *
> >     For the ShuffleEnvironment on TE side: I do not have concerns
> >     always. The TE receives RPC call for deleting local/global
> >     partitions and then handle them via ShuffleEnvironment, just the
> >     similar way as local partitions now.
> >  *
> >     For the ShuffleMaster side: I saw some previous disuccsions on
> >     multiple ShuffleMaster instances run in different components. I
> >     was not against this way in essence, but only wonder it might
> >     bring this feature complex to consider that. So my proposal was
> >     only for excluding ShuffleMaster if possible to make
> >     implementation a bit easy. I thought there might have a somewhat
> >     PartitionTracker component in RM for tracking/deleting global
> >     partitions, just as we did the way now in JM. The partition state
> >     is reported from TE and maintained in PartitionTracker of RM, and
> >     the PartitionTracker could trigger global partition release with
> >     TE gateway directly, and not further via ShuffleMaster(it is also
> >     stateless now). And actually in existing PartitionTrackerImpl in
> >     JM, the PRC call on TE#releasePartitions is also triggered not via
> >     ShuffleMaster in some cases, and it can be regareded as a shortcut
> >     way. Of course I am also in favour of via ShuffleMaster to call
> >     the actual release partition always, and the form seems elegant.
> >
> > I do not expect my inconsequential thought would block this feature
> > ongoing and disturb your previous conclusion. Moreover, Till's recent
> > reply already dispels my previous concern. :)
> >
> > Best,
> > Zhijiang
> >
> >     ------------------------------------------------------------------
> >     From:Chesnay Schepler <[hidden email]>
> >     Send Time:2019年10月14日(星期一) 07:00
> >     To:dev <[hidden email]>; Till Rohrmann
> >     <[hidden email]>; zhijiang <[hidden email]
> .invalid>
> >     Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >
> >     I'm quite torn on whether to exclude the ShuffleServices from the
> >     proposal. I think I'm now on my third or fourth iteration for a
> >
>  response, so I'll just send both so I can stop thinking for a bit about
> >
> >     whether to push for one or the other:
> >
> >     Opinion A, aka "Nu Uh":
> >
> >
>      I'm not in favor of excluding the shuffle master from this proposal;
> >
>      I believe it raises interesting questions that should be discussed
> >
>      beforehand; otherwise we may just end up developing ourselves into a
> >         corner.
> >         Unless there are good reasons for doing so I'd prefer to keep the
> >         functionality across shuffle services consistent.
> >         And man, my last sentence is giving me headaches (how can you
> >
>      introduce inconsistencies across shuffle services if you don't even
> >         touch them?..)
> >
> >
>      Ultimately the RM only needs the ShuffleService for 2 things, which
> >         are fairly straight-forward:
> >
> >          1. list partitions
> >          2. delete partitions
> >
> >
>      Both of these are /exclusively /used via the REST APIs. In terms of
> >         scope I wanted this proposal to contain something that feels
> >         complete. If there is functionality to have a partition stick
> >         around, there needs to be a mechanism to delete it. Thus you also
> >
>      need a way to list them, simply for practical purposes. I do believe
> >
>      that without these this whole proposal is very much incomplete and
> >
>      would hate to see them excluded. It just /makes sense/ to have them.
> >         Yes, technically speak
> >
> >         Could we exclude the external shuffle services from this logic?
> >         Sure, but I'm quite worried that we will not tackle this problem
> >
>      again for 1.10, and if we don't we end up with really inconsistent
> >         behavior across versions. In 1.9 you can have local state in your
> >         master implementation, and, bar extraordinary circumstances, will
> >
>      get a release call for partition that was registered. In 1.10 that
> >
>      last part that goes down the drain, and in 1.X the last part is back
> >         in play but you can't have local state anymore since another
> >         instance is running on the RM.
> >
> >
>      Who is even supposed to keep up with that? It's still an interface
> >         that is exposed to every user. I don't think we should impose
> >         constraints in such a cut loose fashion.
> >
> >
>      At last, the fact that we can implement this in a way where it works
> >
>      for some shuffle services and not others should already be quite a
> >         red flag. The RM maybe shouldn't do any tracking and just forward
> >         the heartbeat payload to the ThinShuffleMaster present on the RM.
> >
> >     Opinion B, aka "technically it would be fine"
> >
> >
>      The counterpoint to the whole REST API completeness argument is that
> >
>      while the /runtime //supports /having partitions stick around, there
> >
>      is technically no way for anyone to enable such behavior at runtime.
> >         Hence, with no user-facing APIs to enable the feature, we don't
> >         necessarily need a user-facing API for management purposes, and
> >         could defer both to a later point where this feature is exposed
> >         fully to users.
> >
> >
>      But then it's hard to justify having any communication between the
> >
>      TE and RM at all; it literally serves no purpose. The TE could just
> >
>      keep cluster partitions around until the RM disconnects. Which would
> >         then also raise the question what exactly of substance is left in
> >         this proposal.
> >
> >     @Till yes, the RM should work against a different interface; I don't
> >
>  think anyone has argued against that. Let's put this point to rest. :)
> >
> >     On 13/10/2019 11:04, Till Rohrmann wrote:
> >
>  > I think we won't necessarily run multiple ShuffleMasters. I think it would
> >
>  > be better to pass in a leaner interface into the RM to only handle the
> >     > deletion of the global result partitions.
> >     >
> >
>  > Letting the TEs handle the deletion of the global result partitions might
> >
>  > work as long as we don't have an external shuffle service implementation.
> >
>  > Hence, it could be a first step to decrease complexity but in order to
> >     > complete this feature, I think we need to do it differently.
> >     >
> >     > Cheers,
> >     > Till
> >     >
> >     > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <
> [hidden email]>
> >     > wrote:
> >     >
> >
>  >> Sorry for delay catching up with the recent progress. Thanks for the FLIP
> >     >> update and valuable discussions!
> >     >>
> >
>  >> I also like the term of job/cluster partitions, and agree with most of the
> >     >> previous comments.
> >     >>
> >     >> Only left one concern of ShuffleMaster side:
> >
>  >>> However, if the separation of JM/RM into separate processes, as outlined
> >
>  >> in FLIP-6, is ever fully realized it necessarily implies that multiple
> >     >> shuffle master instances may exist for a given shuffle service.
> >     >>
> >
>  >> My previous thought was that one ShuffleService factory is for creating
> >
>  >> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
> >     >> might also need differentt ShuffleService factories.
> >
>  >> And it seems that different ShuffleMaster instances could run in different
> >     >> components based on demands, e.g. dispatcher, JM, RM.
> >     >>
> >
>  >> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
> >
>  >> make things a bit easy? I mean the ShuffleMaster is still running in JM
> >
>  >> component and is responsbile for job partitions. For the case of cluster
> >
>  >> partitions, the RM could interact with TE directly. TE would report global
> >
>  >> partitions as payloads via heartbeat with RM. And the RM could call
> >
>  >> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
> >
>  >> could also pass the global released partitions via payloads in heartbeat
> >
>  >> with TE to reduce additional explict RPC call, but this would bring some
> >     >> delays for releasing partition based on heartbeat interval.
> >     >>
> >     >> Best,
> >     >> Zhijiang
> >     >> ------------------------------------------------------------------
> >     >> From:Chesnay Schepler <[hidden email]>
> >     >> Send Time:2019年10月11日(星期五) 10:21
> >     >> To:dev <[hidden email]>; Till Rohrmann <
> [hidden email]>
> >     >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >     >>
> >     >> ooooh I like job-/cluster partitions.
> >     >>
> >     >> On 10/10/2019 16:27, Till Rohrmann wrote:
> >
>  >>> I think we should introduce a separate interface for the ResourceManager
> >     >> so
> >
>  >>> that it can list and delete global result partitions from the shuffle
> >
>  >>> service implementation. As long as the JM and RM run in the same process,
> >
>  >>> this interface could be implemented by the ShuffleMaster implementations.
> >     >>> However, we should make sure that we don't introduce unnecessary
> >
>  >>> concurrency. If that should be the case, then it might be simpler to have
> >     >>> two separate components.
> >     >>>
> >     >>> Some ideas for the naming problem:
> >     >>>
> >     >>> local/global: job/cluster, intra/inter
> >     >>>
> >     >>> Cheers,
> >     >>> Till
> >     >>>
> >     >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <
> [hidden email]>
> >     >> wrote:
> >     >>>> Are there any other opinions in regards to the naming scheme?
> >     >>>> (local/global, promote)
> >     >>>>
> >     >>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
> >     >>>>> Hello,
> >     >>>>>
> >     >>>>> FLIP-36 (interactive programming)
> >     >>>>> <
> >     >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
>  >>>>> proposes a new programming paradigm where jobs are built incrementally
> >     >>>>> by the user.
> >     >>>>>
> >
>  >>>>> To support this in an efficient manner I propose to extend partition
> >
>  >>>>> life-cycle to support the notion of /global partitions/, which are
> >     >>>>> partitions that can exist beyond the life-time of a job.
> >     >>>>>
> >
>  >>>>> These partitions could then be re-used by subsequent jobs in a fairly
> >
>  >>>>> efficient manner, as they don't have to persisted to an external
> >     >>>>> storage first and consuming tasks could be scheduled to exploit
> >     >>>>> data-locality.
> >     >>>>>
> >
>  >>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> >
>  >>>>> and ResourceManager to support this from a life-cycle perspective.
> >     >>>>>
> >     >>>>> This FLIP does /not/ concern itself with the /usage/ of global
> >
>  >>>>> partitions, including client-side APIs, job-submission, scheduling and
> >
>  >>>>> reading said partitions; these are all follow-ups that will either be
> >     >>>>> part of FLIP-36 or spliced out into separate FLIPs.
> >     >>>>>
> >     >>>>>
> >     >>
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Till Rohrmann
Hi Zhu Zhu,

the cluster partition does not need to be registered at the RM before it
can be used. The cluster partition descriptor will be reported to the
client as part of the job execution result. This information is used to
construct a JobGraph which can consume from a cluster partition. The
cluster partition descriptor contains all the information necessary to read
the partition. Hence, a job consuming this partition will simply deploy the
consumer on a TM and then read the cluster partition described by the
cluster partition descriptor. If the partition is no longer available, then
the job will fail and the client needs to handle the situation. If the
client knows how to reprocess the partition, then it would submit the
producing job.

Cheers,
Till

On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu <[hidden email]> wrote:

> Thanks Chesnay for proposing this FLIP! And sorry for the late response on
> it.
> The FLIP overall looks good to me, except for one question.
>
> - If a cluster partition does not exist in RM, how can users tell whether
> it is not produced yet, or it is already released?
> Users/InteractiveQuery may need this information to decide to whether to
> wait or re-execute the producer job.
> One way I can think of is to also check the producer job's state --
> unavailable partition of a finished job means the partition is released.
> But as the cluster partition is notified to RM via TM heartbeat, there can
> be bad case if job is finished but the partition is not updated to RM yet.
> One solution of the bad case might be that TM notifies RM instantly when
> partitions are promoted, as a supplementary to the TM heartbeat way. It
> also shortens the time that a consumer job waits for a cluster partition to
> become available, especially for a sequence of short lived jobs. This
> however introduces JM dependency on RM on job finishes, which is unwanted.
>
>
> Thanks,
> Zhu Zhu
>
> Chesnay Schepler <[hidden email]> 于2019年10月15日周二 下午6:48写道:
>
>> I have updated the FLIP.
>>
>> - adopted job-/cluster partitions naming scheme
>> - out-lined interface for new component living in the RM (currently
>> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
>> would be appreciated)
>> - added a note that the ShuffleService changes are only necessary for
>> external shuffle services, which could be omitted in a first version
>>
>> Unless there are objections I'll start a vote thread later today.
>>
>> On 14/10/2019 06:28, Zhijiang wrote:
>> > Thanks for these further considerations Chesnay!
>> >
>> > I guess we might have some misunderstanding. Actually I was not
>> > against the previous proposal Till suggested before, and I think it is
>> > a formal way to do that.
>> >
>> > And my previous proposal was not for excluding the ShuffleService
>> > completely. The ShuffleService can be regarded as a factory for
>> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
>> > on TE side.
>> >
>> >  *
>> >     For the ShuffleEnvironment on TE side: I do not have concerns
>> >     always. The TE receives RPC call for deleting local/global
>> >     partitions and then handle them via ShuffleEnvironment, just the
>> >     similar way as local partitions now.
>> >  *
>> >     For the ShuffleMaster side: I saw some previous disuccsions on
>> >     multiple ShuffleMaster instances run in different components. I
>> >     was not against this way in essence, but only wonder it might
>> >     bring this feature complex to consider that. So my proposal was
>> >     only for excluding ShuffleMaster if possible to make
>> >     implementation a bit easy. I thought there might have a somewhat
>> >     PartitionTracker component in RM for tracking/deleting global
>> >     partitions, just as we did the way now in JM. The partition state
>> >     is reported from TE and maintained in PartitionTracker of RM, and
>> >     the PartitionTracker could trigger global partition release with
>> >     TE gateway directly, and not further via ShuffleMaster(it is also
>> >     stateless now). And actually in existing PartitionTrackerImpl in
>> >     JM, the PRC call on TE#releasePartitions is also triggered not via
>> >     ShuffleMaster in some cases, and it can be regareded as a shortcut
>> >     way. Of course I am also in favour of via ShuffleMaster to call
>> >     the actual release partition always, and the form seems elegant.
>> >
>> > I do not expect my inconsequential thought would block this feature
>> > ongoing and disturb your previous conclusion. Moreover, Till's recent
>> > reply already dispels my previous concern. :)
>> >
>> > Best,
>> > Zhijiang
>> >
>> >     ------------------------------------------------------------------
>> >     From:Chesnay Schepler <[hidden email]>
>> >     Send Time:2019年10月14日(星期一) 07:00
>> >     To:dev <[hidden email]>; Till Rohrmann
>> >     <[hidden email]>; zhijiang <[hidden email]
>> .invalid>
>> >     Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>> >
>> >     I'm quite torn on whether to exclude the ShuffleServices from the
>> >     proposal. I think I'm now on my third or fourth iteration for a
>> >
>>  response, so I'll just send both so I can stop thinking for a bit about
>> >
>> >     whether to push for one or the other:
>> >
>> >     Opinion A, aka "Nu Uh":
>> >
>> >
>>      I'm not in favor of excluding the shuffle master from this proposal;
>> >
>>      I believe it raises interesting questions that should be discussed
>> >
>>      beforehand; otherwise we may just end up developing ourselves into a
>> >         corner.
>> >
>>      Unless there are good reasons for doing so I'd prefer to keep the
>> >         functionality across shuffle services consistent.
>> >         And man, my last sentence is giving me headaches (how can you
>> >
>>      introduce inconsistencies across shuffle services if you don't even
>> >         touch them?..)
>> >
>> >
>>      Ultimately the RM only needs the ShuffleService for 2 things, which
>> >         are fairly straight-forward:
>> >
>> >          1. list partitions
>> >          2. delete partitions
>> >
>> >
>>      Both of these are /exclusively /used via the REST APIs. In terms of
>> >         scope I wanted this proposal to contain something that feels
>> >         complete. If there is functionality to have a partition stick
>> >
>>      around, there needs to be a mechanism to delete it. Thus you also
>> >
>>      need a way to list them, simply for practical purposes. I do believe
>> >
>>      that without these this whole proposal is very much incomplete and
>> >
>>      would hate to see them excluded. It just /makes sense/ to have them.
>> >         Yes, technically speak
>> >
>> >         Could we exclude the external shuffle services from this logic?
>> >         Sure, but I'm quite worried that we will not tackle this problem
>> >
>>      again for 1.10, and if we don't we end up with really inconsistent
>> >
>>      behavior across versions. In 1.9 you can have local state in your
>> >
>>      master implementation, and, bar extraordinary circumstances, will
>> >
>>      get a release call for partition that was registered. In 1.10 that
>> >
>>      last part that goes down the drain, and in 1.X the last part is back
>> >         in play but you can't have local state anymore since another
>> >         instance is running on the RM.
>> >
>> >
>>      Who is even supposed to keep up with that? It's still an interface
>> >         that is exposed to every user. I don't think we should impose
>> >         constraints in such a cut loose fashion.
>> >
>> >
>>      At last, the fact that we can implement this in a way where it works
>> >
>>      for some shuffle services and not others should already be quite a
>> >
>>      red flag. The RM maybe shouldn't do any tracking and just forward
>> >
>>      the heartbeat payload to the ThinShuffleMaster present on the RM.
>> >
>> >     Opinion B, aka "technically it would be fine"
>> >
>> >
>>      The counterpoint to the whole REST API completeness argument is that
>> >
>>      while the /runtime //supports /having partitions stick around, there
>> >
>>      is technically no way for anyone to enable such behavior at runtime.
>> >         Hence, with no user-facing APIs to enable the feature, we don't
>> >         necessarily need a user-facing API for management purposes, and
>> >         could defer both to a later point where this feature is exposed
>> >         fully to users.
>> >
>> >
>>      But then it's hard to justify having any communication between the
>> >
>>      TE and RM at all; it literally serves no purpose. The TE could just
>> >
>>      keep cluster partitions around until the RM disconnects. Which would
>> >
>>      then also raise the question what exactly of substance is left in
>> >         this proposal.
>> >
>> >     @Till yes, the RM should work against a different interface; I don't
>> >
>>  think anyone has argued against that. Let's put this point to rest. :)
>> >
>> >     On 13/10/2019 11:04, Till Rohrmann wrote:
>> >
>>  > I think we won't necessarily run multiple ShuffleMasters. I think it would
>> >
>>  > be better to pass in a leaner interface into the RM to only handle the
>> >     > deletion of the global result partitions.
>> >     >
>> >
>>  > Letting the TEs handle the deletion of the global result partitions might
>> >
>>  > work as long as we don't have an external shuffle service implementation.
>> >
>>  > Hence, it could be a first step to decrease complexity but in order to
>> >     > complete this feature, I think we need to do it differently.
>> >     >
>> >     > Cheers,
>> >     > Till
>> >     >
>> >     > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <
>> [hidden email]>
>> >     > wrote:
>> >     >
>> >
>>  >> Sorry for delay catching up with the recent progress. Thanks for the FLIP
>> >     >> update and valuable discussions!
>> >     >>
>> >
>>  >> I also like the term of job/cluster partitions, and agree with most of the
>> >     >> previous comments.
>> >     >>
>> >     >> Only left one concern of ShuffleMaster side:
>> >
>>  >>> However, if the separation of JM/RM into separate processes, as outlined
>> >
>>  >> in FLIP-6, is ever fully realized it necessarily implies that multiple
>> >     >> shuffle master instances may exist for a given shuffle service.
>> >     >>
>> >
>>  >> My previous thought was that one ShuffleService factory is for creating
>> >
>>  >> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we
>> >     >> might also need differentt ShuffleService factories.
>> >
>>  >> And it seems that different ShuffleMaster instances could run in different
>> >     >> components based on demands, e.g. dispatcher, JM, RM.
>> >     >>
>> >
>>  >> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to
>> >
>>  >> make things a bit easy? I mean the ShuffleMaster is still running in JM
>> >
>>  >> component and is responsbile for job partitions. For the case of cluster
>> >
>>  >> partitions, the RM could interact with TE directly. TE would report global
>> >
>>  >> partitions as payloads via heartbeat with RM. And the RM could call
>> >
>>  >> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even the RM
>> >
>>  >> could also pass the global released partitions via payloads in heartbeat
>> >
>>  >> with TE to reduce additional explict RPC call, but this would bring some
>> >     >> delays for releasing partition based on heartbeat interval.
>> >     >>
>> >     >> Best,
>> >     >> Zhijiang
>> >
>>  >> ------------------------------------------------------------------
>> >     >> From:Chesnay Schepler <[hidden email]>
>> >     >> Send Time:2019年10月11日(星期五) 10:21
>> >     >> To:dev <[hidden email]>; Till Rohrmann <
>> [hidden email]>
>> >     >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>> >     >>
>> >     >> ooooh I like job-/cluster partitions.
>> >     >>
>> >     >> On 10/10/2019 16:27, Till Rohrmann wrote:
>> >
>>  >>> I think we should introduce a separate interface for the ResourceManager
>> >     >> so
>> >
>>  >>> that it can list and delete global result partitions from the shuffle
>> >
>>  >>> service implementation. As long as the JM and RM run in the same process,
>> >
>>  >>> this interface could be implemented by the ShuffleMaster implementations.
>> >     >>> However, we should make sure that we don't introduce unnecessary
>> >
>>  >>> concurrency. If that should be the case, then it might be simpler to have
>> >     >>> two separate components.
>> >     >>>
>> >     >>> Some ideas for the naming problem:
>> >     >>>
>> >     >>> local/global: job/cluster, intra/inter
>> >     >>>
>> >     >>> Cheers,
>> >     >>> Till
>> >     >>>
>> >     >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <
>> [hidden email]>
>> >     >> wrote:
>> >     >>>> Are there any other opinions in regards to the naming scheme?
>> >     >>>> (local/global, promote)
>> >     >>>>
>> >     >>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
>> >     >>>>> Hello,
>> >     >>>>>
>> >     >>>>> FLIP-36 (interactive programming)
>> >     >>>>> <
>> >     >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>> >
>>  >>>>> proposes a new programming paradigm where jobs are built incrementally
>> >     >>>>> by the user.
>> >     >>>>>
>> >
>>  >>>>> To support this in an efficient manner I propose to extend partition
>> >
>>  >>>>> life-cycle to support the notion of /global partitions/, which are
>> >     >>>>> partitions that can exist beyond the life-time of a job.
>> >     >>>>>
>> >
>>  >>>>> These partitions could then be re-used by subsequent jobs in a fairly
>> >
>>  >>>>> efficient manner, as they don't have to persisted to an external
>> >
>>  >>>>> storage first and consuming tasks could be scheduled to exploit
>> >     >>>>> data-locality.
>> >     >>>>>
>> >
>>  >>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
>> >
>>  >>>>> and ResourceManager to support this from a life-cycle perspective.
>> >     >>>>>
>> >     >>>>> This FLIP does /not/ concern itself with the /usage/ of global
>> >
>>  >>>>> partitions, including client-side APIs, job-submission, scheduling and
>> >
>>  >>>>> reading said partitions; these are all follow-ups that will either be
>> >     >>>>> part of FLIP-36 or spliced out into separate FLIPs.
>> >     >>>>>
>> >     >>>>>
>> >     >>
>> >
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-67: Global partitions lifecycle

Zhu Zhu
Thanks Till for the explanation! That looks good to me.

Thanks,
Zhu Zhu

Till Rohrmann <[hidden email]> 于2019年10月21日周一 上午2:45写道:

> Hi Zhu Zhu,
>
> the cluster partition does not need to be registered at the RM before it
> can be used. The cluster partition descriptor will be reported to the
> client as part of the job execution result. This information is used to
> construct a JobGraph which can consume from a cluster partition. The
> cluster partition descriptor contains all the information necessary to read
> the partition. Hence, a job consuming this partition will simply deploy the
> consumer on a TM and then read the cluster partition described by the
> cluster partition descriptor. If the partition is no longer available, then
> the job will fail and the client needs to handle the situation. If the
> client knows how to reprocess the partition, then it would submit the
> producing job.
>
> Cheers,
> Till
>
> On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu <[hidden email]> wrote:
>
> > Thanks Chesnay for proposing this FLIP! And sorry for the late response
> on
> > it.
> > The FLIP overall looks good to me, except for one question.
> >
> > - If a cluster partition does not exist in RM, how can users tell whether
> > it is not produced yet, or it is already released?
> > Users/InteractiveQuery may need this information to decide to whether to
> > wait or re-execute the producer job.
> > One way I can think of is to also check the producer job's state --
> > unavailable partition of a finished job means the partition is released.
> > But as the cluster partition is notified to RM via TM heartbeat, there
> can
> > be bad case if job is finished but the partition is not updated to RM
> yet.
> > One solution of the bad case might be that TM notifies RM instantly when
> > partitions are promoted, as a supplementary to the TM heartbeat way. It
> > also shortens the time that a consumer job waits for a cluster partition
> to
> > become available, especially for a sequence of short lived jobs. This
> > however introduces JM dependency on RM on job finishes, which is
> unwanted.
> >
> >
> > Thanks,
> > Zhu Zhu
> >
> > Chesnay Schepler <[hidden email]> 于2019年10月15日周二 下午6:48写道:
> >
> >> I have updated the FLIP.
> >>
> >> - adopted job-/cluster partitions naming scheme
> >> - out-lined interface for new component living in the RM (currently
> >> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
> >> would be appreciated)
> >> - added a note that the ShuffleService changes are only necessary for
> >> external shuffle services, which could be omitted in a first version
> >>
> >> Unless there are objections I'll start a vote thread later today.
> >>
> >> On 14/10/2019 06:28, Zhijiang wrote:
> >> > Thanks for these further considerations Chesnay!
> >> >
> >> > I guess we might have some misunderstanding. Actually I was not
> >> > against the previous proposal Till suggested before, and I think it is
> >> > a formal way to do that.
> >> >
> >> > And my previous proposal was not for excluding the ShuffleService
> >> > completely. The ShuffleService can be regarded as a factory for
> >> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> >> > on TE side.
> >> >
> >> >  *
> >> >     For the ShuffleEnvironment on TE side: I do not have concerns
> >> >     always. The TE receives RPC call for deleting local/global
> >> >     partitions and then handle them via ShuffleEnvironment, just the
> >> >     similar way as local partitions now.
> >> >  *
> >> >     For the ShuffleMaster side: I saw some previous disuccsions on
> >> >     multiple ShuffleMaster instances run in different components. I
> >> >     was not against this way in essence, but only wonder it might
> >> >     bring this feature complex to consider that. So my proposal was
> >> >     only for excluding ShuffleMaster if possible to make
> >> >     implementation a bit easy. I thought there might have a somewhat
> >> >     PartitionTracker component in RM for tracking/deleting global
> >> >     partitions, just as we did the way now in JM. The partition state
> >> >     is reported from TE and maintained in PartitionTracker of RM, and
> >> >     the PartitionTracker could trigger global partition release with
> >> >     TE gateway directly, and not further via ShuffleMaster(it is also
> >> >     stateless now). And actually in existing PartitionTrackerImpl in
> >> >     JM, the PRC call on TE#releasePartitions is also triggered not via
> >> >     ShuffleMaster in some cases, and it can be regareded as a shortcut
> >> >     way. Of course I am also in favour of via ShuffleMaster to call
> >> >     the actual release partition always, and the form seems elegant.
> >> >
> >> > I do not expect my inconsequential thought would block this feature
> >> > ongoing and disturb your previous conclusion. Moreover, Till's recent
> >> > reply already dispels my previous concern. :)
> >> >
> >> > Best,
> >> > Zhijiang
> >> >
> >> >     ------------------------------------------------------------------
> >> >     From:Chesnay Schepler <[hidden email]>
> >> >     Send Time:2019年10月14日(星期一) 07:00
> >> >     To:dev <[hidden email]>; Till Rohrmann
> >> >     <[hidden email]>; zhijiang <[hidden email]
> >> .invalid>
> >> >     Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >> >
> >> >     I'm quite torn on whether to exclude the ShuffleServices from the
> >> >     proposal. I think I'm now on my third or fourth iteration for a
> >> >
> >>  response, so I'll just send both so I can stop thinking for a bit about
> >> >
> >> >     whether to push for one or the other:
> >> >
> >> >     Opinion A, aka "Nu Uh":
> >> >
> >> >
> >>      I'm not in favor of excluding the shuffle master from this
> proposal;
> >> >
> >>      I believe it raises interesting questions that should be discussed
> >> >
> >>      beforehand; otherwise we may just end up developing ourselves into
> a
> >> >         corner.
> >> >
> >>      Unless there are good reasons for doing so I'd prefer to keep the
> >> >         functionality across shuffle services consistent.
> >> >         And man, my last sentence is giving me headaches (how can you
> >> >
> >>      introduce inconsistencies across shuffle services if you don't even
> >> >         touch them?..)
> >> >
> >> >
> >>      Ultimately the RM only needs the ShuffleService for 2 things, which
> >> >         are fairly straight-forward:
> >> >
> >> >          1. list partitions
> >> >          2. delete partitions
> >> >
> >> >
> >>      Both of these are /exclusively /used via the REST APIs. In terms of
> >> >         scope I wanted this proposal to contain something that feels
> >> >         complete. If there is functionality to have a partition stick
> >> >
> >>      around, there needs to be a mechanism to delete it. Thus you also
> >> >
> >>      need a way to list them, simply for practical purposes. I do
> believe
> >> >
> >>      that without these this whole proposal is very much incomplete and
> >> >
> >>      would hate to see them excluded. It just /makes sense/ to have
> them.
> >> >         Yes, technically speak
> >> >
> >> >         Could we exclude the external shuffle services from this
> logic?
> >> >         Sure, but I'm quite worried that we will not tackle this
> problem
> >> >
> >>      again for 1.10, and if we don't we end up with really inconsistent
> >> >
> >>      behavior across versions. In 1.9 you can have local state in your
> >> >
> >>      master implementation, and, bar extraordinary circumstances, will
> >> >
> >>      get a release call for partition that was registered. In 1.10 that
> >> >
> >>      last part that goes down the drain, and in 1.X the last part is
> back
> >> >         in play but you can't have local state anymore since another
> >> >         instance is running on the RM.
> >> >
> >> >
> >>      Who is even supposed to keep up with that? It's still an interface
> >> >         that is exposed to every user. I don't think we should impose
> >> >         constraints in such a cut loose fashion.
> >> >
> >> >
> >>      At last, the fact that we can implement this in a way where it
> works
> >> >
> >>      for some shuffle services and not others should already be quite a
> >> >
> >>      red flag. The RM maybe shouldn't do any tracking and just forward
> >> >
> >>      the heartbeat payload to the ThinShuffleMaster present on the RM.
> >> >
> >> >     Opinion B, aka "technically it would be fine"
> >> >
> >> >
> >>      The counterpoint to the whole REST API completeness argument is
> that
> >> >
> >>      while the /runtime //supports /having partitions stick around,
> there
> >> >
> >>      is technically no way for anyone to enable such behavior at
> runtime.
> >> >         Hence, with no user-facing APIs to enable the feature, we
> don't
> >> >         necessarily need a user-facing API for management purposes,
> and
> >> >         could defer both to a later point where this feature is
> exposed
> >> >         fully to users.
> >> >
> >> >
> >>      But then it's hard to justify having any communication between the
> >> >
> >>      TE and RM at all; it literally serves no purpose. The TE could just
> >> >
> >>      keep cluster partitions around until the RM disconnects. Which
> would
> >> >
> >>      then also raise the question what exactly of substance is left in
> >> >         this proposal.
> >> >
> >> >     @Till yes, the RM should work against a different interface; I
> don't
> >> >
> >>  think anyone has argued against that. Let's put this point to rest. :)
> >> >
> >> >     On 13/10/2019 11:04, Till Rohrmann wrote:
> >> >
> >>  > I think we won't necessarily run multiple ShuffleMasters. I think it
> would
> >> >
> >>  > be better to pass in a leaner interface into the RM to only handle
> the
> >> >     > deletion of the global result partitions.
> >> >     >
> >> >
> >>  > Letting the TEs handle the deletion of the global result partitions
> might
> >> >
> >>  > work as long as we don't have an external shuffle service
> implementation.
> >> >
> >>  > Hence, it could be a first step to decrease complexity but in order
> to
> >> >     > complete this feature, I think we need to do it differently.
> >> >     >
> >> >     > Cheers,
> >> >     > Till
> >> >     >
> >> >     > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <
> >> [hidden email]>
> >> >     > wrote:
> >> >     >
> >> >
> >>  >> Sorry for delay catching up with the recent progress. Thanks for
> the FLIP
> >> >     >> update and valuable discussions!
> >> >     >>
> >> >
> >>  >> I also like the term of job/cluster partitions, and agree with most
> of the
> >> >     >> previous comments.
> >> >     >>
> >> >     >> Only left one concern of ShuffleMaster side:
> >> >
> >>  >>> However, if the separation of JM/RM into separate processes, as
> outlined
> >> >
> >>  >> in FLIP-6, is ever fully realized it necessarily implies that
> multiple
> >> >     >> shuffle master instances may exist for a given shuffle service.
> >> >     >>
> >> >
> >>  >> My previous thought was that one ShuffleService factory is for
> creating
> >> >
> >>  >> one shuffleMaster instance. If we have multiple ShuffleMaster
> instances, we
> >> >     >> might also need differentt ShuffleService factories.
> >> >
> >>  >> And it seems that different ShuffleMaster instances could run in
> different
> >> >     >> components based on demands, e.g. dispatcher, JM, RM.
> >> >     >>
> >> >
> >>  >> Is it also feasible to not touch the ShuffleMaster concept in this
> FLIP to
> >> >
> >>  >> make things a bit easy? I mean the ShuffleMaster is still running
> in JM
> >> >
> >>  >> component and is responsbile for job partitions. For the case of
> cluster
> >> >
> >>  >> partitions, the RM could interact with TE directly. TE would report
> global
> >> >
> >>  >> partitions as payloads via heartbeat with RM. And the RM could call
> >> >
> >>  >> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even
> the RM
> >> >
> >>  >> could also pass the global released partitions via payloads in
> heartbeat
> >> >
> >>  >> with TE to reduce additional explict RPC call, but this would bring
> some
> >> >     >> delays for releasing partition based on heartbeat interval.
> >> >     >>
> >> >     >> Best,
> >> >     >> Zhijiang
> >> >
> >>  >> ------------------------------------------------------------------
> >> >     >> From:Chesnay Schepler <[hidden email]>
> >> >     >> Send Time:2019年10月11日(星期五) 10:21
> >> >     >> To:dev <[hidden email]>; Till Rohrmann <
> >> [hidden email]>
> >> >     >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >> >     >>
> >> >     >> ooooh I like job-/cluster partitions.
> >> >     >>
> >> >     >> On 10/10/2019 16:27, Till Rohrmann wrote:
> >> >
> >>  >>> I think we should introduce a separate interface for the
> ResourceManager
> >> >     >> so
> >> >
> >>  >>> that it can list and delete global result partitions from the
> shuffle
> >> >
> >>  >>> service implementation. As long as the JM and RM run in the same
> process,
> >> >
> >>  >>> this interface could be implemented by the ShuffleMaster
> implementations.
> >> >     >>> However, we should make sure that we don't introduce
> unnecessary
> >> >
> >>  >>> concurrency. If that should be the case, then it might be simpler
> to have
> >> >     >>> two separate components.
> >> >     >>>
> >> >     >>> Some ideas for the naming problem:
> >> >     >>>
> >> >     >>> local/global: job/cluster, intra/inter
> >> >     >>>
> >> >     >>> Cheers,
> >> >     >>> Till
> >> >     >>>
> >> >     >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <
> >> [hidden email]>
> >> >     >> wrote:
> >> >     >>>> Are there any other opinions in regards to the naming scheme?
> >> >     >>>> (local/global, promote)
> >> >     >>>>
> >> >     >>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
> >> >     >>>>> Hello,
> >> >     >>>>>
> >> >     >>>>> FLIP-36 (interactive programming)
> >> >     >>>>> <
> >> >     >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >> >
> >>  >>>>> proposes a new programming paradigm where jobs are built
> incrementally
> >> >     >>>>> by the user.
> >> >     >>>>>
> >> >
> >>  >>>>> To support this in an efficient manner I propose to extend
> partition
> >> >
> >>  >>>>> life-cycle to support the notion of /global partitions/, which
> are
> >> >     >>>>> partitions that can exist beyond the life-time of a job.
> >> >     >>>>>
> >> >
> >>  >>>>> These partitions could then be re-used by subsequent jobs in a
> fairly
> >> >
> >>  >>>>> efficient manner, as they don't have to persisted to an external
> >> >
> >>  >>>>> storage first and consuming tasks could be scheduled to exploit
> >> >     >>>>> data-locality.
> >> >     >>>>>
> >> >
> >>  >>>>> The FLIP outlines the required changes on the JobMaster,
> TaskExecutor
> >> >
> >>  >>>>> and ResourceManager to support this from a life-cycle
> perspective.
> >> >     >>>>>
> >> >     >>>>> This FLIP does /not/ concern itself with the /usage/ of
> global
> >> >
> >>  >>>>> partitions, including client-side APIs, job-submission,
> scheduling and
> >> >
> >>  >>>>> reading said partitions; these are all follow-ups that will
> either be
> >> >     >>>>> part of FLIP-36 or spliced out into separate FLIPs.
> >> >     >>>>>
> >> >     >>>>>
> >> >     >>
> >> >
> >> >
> >>
> >>
>
12