Hello,
FLIP-36 (interactive programming) <https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> proposes a new programming paradigm where jobs are built incrementally by the user. To support this in an efficient manner I propose to extend partition life-cycle to support the notion of /global partitions/, which are partitions that can exist beyond the life-time of a job. These partitions could then be re-used by subsequent jobs in a fairly efficient manner, as they don't have to persisted to an external storage first and consuming tasks could be scheduled to exploit data-locality. The FLIP outlines the required changes on the JobMaster, TaskExecutor and ResourceManager to support this from a life-cycle perspective. This FLIP does /not/ concern itself with the /usage/ of global partitions, including client-side APIs, job-submission, scheduling and reading said partitions; these are all follow-ups that will either be part of FLIP-36 or spliced out into separate FLIPs. |
Thanks Chesnay for drafting the FLIP and starting this discussion.
I have a couple of comments: * I know that I've also coined the terms global/local result partition but maybe it is not the perfect name. Maybe we could rethink the terminology and call them persistent result partitions? * Nit: I would call the last parameter of void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote) either partitionsToRetain or partitionsToPersistent. * I'm not sure whether partitionsToRelease should contain a global/persistent result partition id. I always thought that the user will be responsible for managing the lifecycle of a global/persistent result partition. * Instead of extending the PartitionTable to be able to store global/persistent and local/transient result partitions, I would rather introduce a global PartitionTable to store the global/persistent result partitions explicitly. I think there is a benefit in making things as explicit as possible. * The handover logic between the JM and the RM for the global/persistent result partitions seems a bit brittle to me. What will happen if the JM cannot reach the RM? I think it would be better if the TM announces the global/persistent result partitions to the RM via its heartbeats. That way we don't rely on an established connection between the JM and RM and we keep the TM as the ground of truth. Moreover, the RM should simply forward the release calls to the TM without much internal logic. Cheers, Till On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> wrote: > Hello, > > FLIP-36 (interactive programming) > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> > > proposes a new programming paradigm where jobs are built incrementally > by the user. > > To support this in an efficient manner I propose to extend partition > life-cycle to support the notion of /global partitions/, which are > partitions that can exist beyond the life-time of a job. > > These partitions could then be re-used by subsequent jobs in a fairly > efficient manner, as they don't have to persisted to an external storage > first and consuming tasks could be scheduled to exploit data-locality. > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > and ResourceManager to support this from a life-cycle perspective. > > This FLIP does /not/ concern itself with the /usage/ of global > partitions, including client-side APIs, job-submission, scheduling and > reading said partitions; these are all follow-ups that will either be > part of FLIP-36 or spliced out into separate FLIPs. > > |
Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my side. I also have some similar concerns which Till already proposed before. 1. The consistent terminology in different components. On JM side, PartitionTracker#getPersistedBlockingPartitions is defined for getting global partitions. And on RM side, we define the method of #registerGlobalPartitions correspondingly for handover the partitions from JM. I think it is better to unify the term in different components for for better understanding the semantic. Concering whether to use global or persistent, I prefer the "global" term personally. Because it describes the scope of partition clearly, and the "persistent" is more like the partition storing way or implementation detail. In other words, the global partition might also be cached in memory of TE, not must persist into files from semantic requirements. Whether memory or persistent file is just the implementation choice. 2. On TE side, we might rename the method #releasePartitions to #releaseOrPromotePartitions which describes the function precisely and keeps consistent with PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). 3. Very agree with Till's suggestions of global PartitionTable on TE side and sticking to TE's heartbeat report to RM for global partitions. 4. Considering ShuffleMaster, it was built inside JM and expected to interactive with JM before. Now the RM also needs to interactive with ShuffleMaster to release global partitions. Then it might be better to move ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be consistent with RM. 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global partitions for successful jobs" Best, Zhijiang ------------------------------------------------------------------ From:Till Rohrmann <[hidden email]> Send Time:2019年9月10日(星期二) 10:10 To:dev <[hidden email]> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle Thanks Chesnay for drafting the FLIP and starting this discussion. I have a couple of comments: * I know that I've also coined the terms global/local result partition but maybe it is not the perfect name. Maybe we could rethink the terminology and call them persistent result partitions? * Nit: I would call the last parameter of void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionsToRelease, Collection<ResultPartitionID> partitionsToPromote) either partitionsToRetain or partitionsToPersistent. * I'm not sure whether partitionsToRelease should contain a global/persistent result partition id. I always thought that the user will be responsible for managing the lifecycle of a global/persistent result partition. * Instead of extending the PartitionTable to be able to store global/persistent and local/transient result partitions, I would rather introduce a global PartitionTable to store the global/persistent result partitions explicitly. I think there is a benefit in making things as explicit as possible. * The handover logic between the JM and the RM for the global/persistent result partitions seems a bit brittle to me. What will happen if the JM cannot reach the RM? I think it would be better if the TM announces the global/persistent result partitions to the RM via its heartbeats. That way we don't rely on an established connection between the JM and RM and we keep the TM as the ground of truth. Moreover, the RM should simply forward the release calls to the TM without much internal logic. Cheers, Till On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> wrote: > Hello, > > FLIP-36 (interactive programming) > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> > > proposes a new programming paradigm where jobs are built incrementally > by the user. > > To support this in an efficient manner I propose to extend partition > life-cycle to support the notion of /global partitions/, which are > partitions that can exist beyond the life-time of a job. > > These partitions could then be re-used by subsequent jobs in a fairly > efficient manner, as they don't have to persisted to an external storage > first and consuming tasks could be scheduled to exploit data-locality. > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > and ResourceManager to support this from a life-cycle perspective. > > This FLIP does /not/ concern itself with the /usage/ of global > partitions, including client-side APIs, job-submission, scheduling and > reading said partitions; these are all follow-ups that will either be > part of FLIP-36 or spliced out into separate FLIPs. > > |
Hi Chesnay,
Thanks for the proposal. My understanding of the entire workflow step by step is following: - JM maintains the local and global partition metadata when the task runs to create result partitions. The tasks themselves does not distinguish between local / global partitions. Only the JM knows that. - JM releases the local partitions as the job executes. When a job finishes successfully, JM registers the global partitions to the RM. The global partition IDs are set on the client instead of randomly generated, so the client can release global partitions using them. (It would be good to have some human readable string associated with the global result partitions). - Client issues REST call to list / release global partitions. A few thoughts / questions below: 1. Failure cases: * The TEs may remove the result partition while the RM does not know. In this case, the client will receive a runtime error and submit the full DAG to recompute the missing result partition. In this case, RM should release the incomplete global partition. How would RM be notified to do that? * Is it possible the RM looses global partition metadata while the TE still host the data? For example, RM deletes the global partition entry while the release partition call to TE failed. * What would happen if the JM fails before the global partitions are registered to RM? Are users exposed to resource leak if JM does not have HA? * What would happen if the RM fails? Will TE release the partitions by themselves? 2. It looks that TE should be the source of truth of the result partition existence. Does it have to distinguish between global and local result partitions? If TE does not need to distinguish them, it seems the the releasePartition() method in TE could just provide the list of partitions to release, without the partitions to promote. 3. In the current design, RM should be able to release result partitions using ShuffleService. Will RM do this by sending RPC to the TEs? Or will the RM do it by itself? 4. How do we plan to handle the case when there are different shuffle services in the same Flink cluster? For example, a shared standalone cluster. 5. Minor: usually REST API uses `?` to pass the parameters. Is there a reason we use `:` instead? Thanks, Jiangjie (Becket) Qin On Tue, Sep 17, 2019 at 3:22 AM zhijiang <[hidden email]> wrote: > > Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my > side. > > I also have some similar concerns which Till already proposed before. > > 1. The consistent terminology in different components. On JM side, > PartitionTracker#getPersistedBlockingPartitions is defined for getting > global partitions. And on RM side, we define the method of > #registerGlobalPartitions correspondingly for handover the partitions from > JM. I think it is better to unify the term in different components for for > better understanding the semantic. Concering whether to use global or > persistent, I prefer the "global" term personally. Because it describes the > scope of partition clearly, and the "persistent" is more like the partition > storing way or implementation detail. In other words, the global partition > might also be cached in memory of TE, not must persist into files from > semantic requirements. Whether memory or persistent file is just the > implementation choice. > > 2. On TE side, we might rename the method #releasePartitions to > #releaseOrPromotePartitions which describes the function precisely and > keeps consistent with > PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > > 3. Very agree with Till's suggestions of global PartitionTable on TE side > and sticking to TE's heartbeat report to RM for global partitions. > > 4. Considering ShuffleMaster, it was built inside JM and expected to > interactive with JM before. Now the RM also needs to interactive with > ShuffleMaster to release global partitions. Then it might be better to move > ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be > consistent with RM. > > 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > partitions for successful jobs" > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Till Rohrmann <[hidden email]> > Send Time:2019年9月10日(星期二) 10:10 > To:dev <[hidden email]> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > Thanks Chesnay for drafting the FLIP and starting this discussion. > > I have a couple of comments: > > * I know that I've also coined the terms global/local result partition but > maybe it is not the perfect name. Maybe we could rethink the terminology > and call them persistent result partitions? > * Nit: I would call the last parameter of void releasePartitions(JobID > jobId, Collection<ResultPartitionID> partitionsToRelease, > Collection<ResultPartitionID> partitionsToPromote) either > partitionsToRetain or partitionsToPersistent. > * I'm not sure whether partitionsToRelease should contain a > global/persistent result partition id. I always thought that the user will > be responsible for managing the lifecycle of a global/persistent > result partition. > * Instead of extending the PartitionTable to be able to store > global/persistent and local/transient result partitions, I would rather > introduce a global PartitionTable to store the global/persistent result > partitions explicitly. I think there is a benefit in making things as > explicit as possible. > * The handover logic between the JM and the RM for the global/persistent > result partitions seems a bit brittle to me. What will happen if the JM > cannot reach the RM? I think it would be better if the TM announces the > global/persistent result partitions to the RM via its heartbeats. That way > we don't rely on an established connection between the JM and RM and we > keep the TM as the ground of truth. Moreover, the RM should simply forward > the release calls to the TM without much internal logic. > > Cheers, > Till > > On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> > wrote: > > > Hello, > > > > FLIP-36 (interactive programming) > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > proposes a new programming paradigm where jobs are built incrementally > > by the user. > > > > To support this in an efficient manner I propose to extend partition > > life-cycle to support the notion of /global partitions/, which are > > partitions that can exist beyond the life-time of a job. > > > > These partitions could then be re-used by subsequent jobs in a fairly > > efficient manner, as they don't have to persisted to an external storage > > first and consuming tasks could be scheduled to exploit data-locality. > > > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > > and ResourceManager to support this from a life-cycle perspective. > > > > This FLIP does /not/ concern itself with the /usage/ of global > > partitions, including client-side APIs, job-submission, scheduling and > > reading said partitions; these are all follow-ups that will either be > > part of FLIP-36 or spliced out into separate FLIPs. > > > > > > |
Forgot to say that I agree with Till that it seems a good idea to let TEs
register the global partitions to the RM instead of letting JM do it. This simplifies quite a few things. Thanks, Jiangjie (Becket) Qin On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> wrote: > Hi Chesnay, > > Thanks for the proposal. My understanding of the entire workflow step by > step is following: > > - JM maintains the local and global partition metadata when the task > runs to create result partitions. The tasks themselves does not distinguish > between local / global partitions. Only the JM knows that. > - JM releases the local partitions as the job executes. When a job > finishes successfully, JM registers the global partitions to the RM. The > global partition IDs are set on the client instead of randomly generated, > so the client can release global partitions using them. (It would be good > to have some human readable string associated with the global result > partitions). > - Client issues REST call to list / release global partitions. > > A few thoughts / questions below: > 1. Failure cases: > * The TEs may remove the result partition while the RM does not > know. In this case, the client will receive a runtime error and submit the > full DAG to recompute the missing result partition. In this case, RM should > release the incomplete global partition. How would RM be notified to do > that? > * Is it possible the RM looses global partition metadata while > the TE still host the data? For example, RM deletes the global partition > entry while the release partition call to TE failed. > * What would happen if the JM fails before the global partitions > are registered to RM? Are users exposed to resource leak if JM does not > have HA? > * What would happen if the RM fails? Will TE release the > partitions by themselves? > > 2. It looks that TE should be the source of truth of the result partition > existence. Does it have to distinguish between global and local result > partitions? If TE does not need to distinguish them, it seems the the > releasePartition() method in TE could just provide the list of partitions > to release, without the partitions to promote. > > 3. In the current design, RM should be able to release result > partitions using ShuffleService. Will RM do this by sending RPC to the TEs? > Or will the RM do it by itself? > > 4. How do we plan to handle the case when there are different shuffle > services in the same Flink cluster? For example, a shared standalone > cluster. > > 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > reason we use `:` instead? > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Sep 17, 2019 at 3:22 AM zhijiang > <[hidden email]> wrote: > >> >> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my >> side. >> >> I also have some similar concerns which Till already proposed before. >> >> 1. The consistent terminology in different components. On JM side, >> PartitionTracker#getPersistedBlockingPartitions is defined for getting >> global partitions. And on RM side, we define the method of >> #registerGlobalPartitions correspondingly for handover the partitions from >> JM. I think it is better to unify the term in different components for for >> better understanding the semantic. Concering whether to use global or >> persistent, I prefer the "global" term personally. Because it describes the >> scope of partition clearly, and the "persistent" is more like the partition >> storing way or implementation detail. In other words, the global partition >> might also be cached in memory of TE, not must persist into files from >> semantic requirements. Whether memory or persistent file is just the >> implementation choice. >> >> 2. On TE side, we might rename the method #releasePartitions to >> #releaseOrPromotePartitions which describes the function precisely and >> keeps consistent with >> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >> >> 3. Very agree with Till's suggestions of global PartitionTable on TE side >> and sticking to TE's heartbeat report to RM for global partitions. >> >> 4. Considering ShuffleMaster, it was built inside JM and expected to >> interactive with JM before. Now the RM also needs to interactive with >> ShuffleMaster to release global partitions. Then it might be better to move >> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be >> consistent with RM. >> >> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >> partitions for successful jobs" >> >> Best, >> Zhijiang >> >> >> ------------------------------------------------------------------ >> From:Till Rohrmann <[hidden email]> >> Send Time:2019年9月10日(星期二) 10:10 >> To:dev <[hidden email]> >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >> >> Thanks Chesnay for drafting the FLIP and starting this discussion. >> >> I have a couple of comments: >> >> * I know that I've also coined the terms global/local result partition but >> maybe it is not the perfect name. Maybe we could rethink the terminology >> and call them persistent result partitions? >> * Nit: I would call the last parameter of void releasePartitions(JobID >> jobId, Collection<ResultPartitionID> partitionsToRelease, >> Collection<ResultPartitionID> partitionsToPromote) either >> partitionsToRetain or partitionsToPersistent. >> * I'm not sure whether partitionsToRelease should contain a >> global/persistent result partition id. I always thought that the user will >> be responsible for managing the lifecycle of a global/persistent >> result partition. >> * Instead of extending the PartitionTable to be able to store >> global/persistent and local/transient result partitions, I would rather >> introduce a global PartitionTable to store the global/persistent result >> partitions explicitly. I think there is a benefit in making things as >> explicit as possible. >> * The handover logic between the JM and the RM for the global/persistent >> result partitions seems a bit brittle to me. What will happen if the JM >> cannot reach the RM? I think it would be better if the TM announces the >> global/persistent result partitions to the RM via its heartbeats. That way >> we don't rely on an established connection between the JM and RM and we >> keep the TM as the ground of truth. Moreover, the RM should simply forward >> the release calls to the TM without much internal logic. >> >> Cheers, >> Till >> >> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> >> wrote: >> >> > Hello, >> > >> > FLIP-36 (interactive programming) >> > < >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >> > >> > >> > proposes a new programming paradigm where jobs are built incrementally >> > by the user. >> > >> > To support this in an efficient manner I propose to extend partition >> > life-cycle to support the notion of /global partitions/, which are >> > partitions that can exist beyond the life-time of a job. >> > >> > These partitions could then be re-used by subsequent jobs in a fairly >> > efficient manner, as they don't have to persisted to an external storage >> > first and consuming tasks could be scheduled to exploit data-locality. >> > >> > The FLIP outlines the required changes on the JobMaster, TaskExecutor >> > and ResourceManager to support this from a life-cycle perspective. >> > >> > This FLIP does /not/ concern itself with the /usage/ of global >> > partitions, including client-side APIs, job-submission, scheduling and >> > reading said partitions; these are all follow-ups that will either be >> > part of FLIP-36 or spliced out into separate FLIPs. >> > >> > >> >> |
Thank you for your comments; I've aggregated them a bit and added
comments to each of them. 1) Concept name (proposal: persistent) I agree that "global" is rather undescriptive, particularly so since we never had a notion of "local" partitions. I'm not a fan of "persistent"; as to me this always implies reliable long-term storage which as I understand we aren't shooting for here. I was thinking of "cached" partitions. To Zhijiangs point, we should of course make the naming consistent everywhere. 2) Naming of last parameter of TE#releasePartitions (proposal: partitionsToRetain / partitionsToPersistent) I can see where you're coming from ("promote" is somewhat abstract), but I think both suggestions have downsides. "partitionsToPersistent" to me implies an additional write operation to somewhere, but we aren't doing that. "partitionsToRetain" kind of results in a redundancy with the other argument since retaining is the opposite to releasing a partition; if I want to retain a partition, why am I not just excluding it from the set to release? I quite like "promote" personally; we fundamentally change how the lifecycle for these partitions work, and introducing new keywords isn't a inherently a bad choice. 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; Note: addition of "OrPromote" is dependent on 2) ) Good point. 4) /Till: I'm not sure whether partitionsToRelease should contain a// //global/persistent result partition id. I always thought that the user will// //be responsible for managing the lifecycle of a global/persistent// //result partition./ @Till Please elaborate; which method/argument are you referring to? 4)/Dedicated PartitionTable for global partitions/ Since there is only one RM for each TE a PartitionTable is unnecessary; a simple set will suffice. Alternatively, we could introduce such a dedicated set into the PartitionTable to keep these data-structures close. 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs retain global partitions for successful jobs"/ Will fix it. 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and expected to interactive with JM before. Now the RM also needs to interactive with ShuffleMaster to release global partitions. Then it might be better to move ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be consistent with RM./ Yes, I alluded to this in the FLIP but should've been more explicit; the shuffle master must outlive the JM. This is somewhat tricky when considering the future a bit; if we assume that different jobs or even a single one can use different shuffle services, then we need a way to associate the partitions with the corresponding shuffle master. This will likely require the introduction of a ShuffleMasterID that is included in the ShuffleDescriptor. 7) Handover /Till: The handover logic between the JM and the RM for the global/persistent// //result partitions seems a bit brittle to me. What will happen if the JM// //cannot reach the RM? I think it would be better if the TM announces the// //global/persistent result partitions to the RM via its heartbeats. That way// //we don't rely on an established connection between the JM and RM and we// //keep the TM as the ground of truth. Moreover, the RM should simply forward// //the release calls to the TM without much internal logic./ As for your question, if the JM cannot reach the RM the handover will fail, the JM will likely shutdown without promoting any partition and the TE will release all partitions. What is the defined behavior for the JM in case of the RM disconnect after a job has finished? Does it always/sometimes/never shutdown with/-out communicating the result to the client / updating HA data; or simply put, does the JM behave to the user as if nothing has happened in all cases? A heartbeat-based approach is useful and can alleviate some failure cases (see below); but we need to make sure we don't exceed the akka framesize or otherwise interfere with the heartbeat mechanism (like we did with metrics in the past). Ideally we would only submit updates to the partition set (added/removed partitions), but I'm not sure if the heartbeats are reliable enough for this to work. 8. Failure cases: /Becket:/ /a) The TEs may remove the result partition while the RM does not// //know. In this case, the client will receive a runtime error and submit the// //full DAG to recompute the missing result partition. In this case, RM should// //release the incomplete global partition. How would RM be notified to do// //that?// //b) Is it possible the RM looses global partition metadata while// //the TE still host the data? For example, RM deletes the global partition// //entry while the release partition call to TE failed.// //c) What would happen if the JM fails before the global partitions// //are registered to RM? Are users exposed to resource leak if JM does not// //have HA?// //d) What would happen if the RM fails? Will TE release the// //partitions by themselves?/ 1.a) This is a good question that I haven't considered. This will likely require a heartbeat-like report of available partitions. 1.b) RM should only delete entries if it received an ack from the TE; otherwise we could easily end up leaking partitions. I believe I forgot writing this down. 1.c) As described in the FLIP the handoff to the RM must occur before partitions are promoted. If the JM fails during the handoff then the TE will cleanup all partitions since it lost the connection to the JM, and partitions weren't promoted yet. If the JM fails after the handoff but before the promotion, same as above. The RM would contain invalid entries in this case; see 1.a) . If the JM fails after the handoff and promotion partitions we don't leak anything since the RM is now fully responsible. 1.d) yes; if the connection to the RM is disrupted the TE will cleanup all global partitions, similar to how it cleans up all partitions associated with a given job if the connection to the corresponding JM is disrupted. 9. /Becket: It looks that TE should be the source of truth of the result partition// //existence. Does it have to distinguish between global and local result// //partitions? If TE does not need to distinguish them, it seems the the// //releasePartition() method in TE could just provide the list of partitions// //to release, without the partitions to promote./ The promotion is a hard requirement, as this is the signal to the TE that this partition is no longer bound to the life-cycle of a job. Without the promotion the TE would delete the partition once the JM has shutdown; this is a safety net to ensure cleanup of partitions in case of a disconnect. 10. /In the current design, RM should be able to release result// //partitions using ShuffleService. Will RM do this by sending RPC to the TEs?// //Or will the RM do it by itself?/ The RM will send a release call to each TM and issue a release call to the ShuffleMaster, just like the JobMaster handles partition releases. 11. /Becket: How do we plan to handle the case when there are different shuffle// //services in the same Flink cluster? For example, a shared standalone// //cluster./ This case is not considered; there are so many changes necessary in other parts of the runtime that we would jump the gun in addressing it here. Ultimately though, I would think that that the addition of a shuffle master instance ID and shuffle service identifier should suffice. The identifier is used in subsequent jobs to load the appropriate shuffle service for a given partition (think of it like a class name), while the shuffle master instance ID is used to differentiate between the different shuffle master instances running in the cluster (which partitions have to be associated with so we can issue the correct release calls). 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is there a// //reason we use `:` instead?/ That's netty syntax for path parameters. On 30/09/2019 08:34, Becket Qin wrote: > Forgot to say that I agree with Till that it seems a good idea to let TEs > register the global partitions to the RM instead of letting JM do it. This > simplifies quite a few things. > > Thanks, > > Jiangjie (Becket) Qin > > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> wrote: > >> Hi Chesnay, >> >> Thanks for the proposal. My understanding of the entire workflow step by >> step is following: >> >> - JM maintains the local and global partition metadata when the task >> runs to create result partitions. The tasks themselves does not distinguish >> between local / global partitions. Only the JM knows that. >> - JM releases the local partitions as the job executes. When a job >> finishes successfully, JM registers the global partitions to the RM. The >> global partition IDs are set on the client instead of randomly generated, >> so the client can release global partitions using them. (It would be good >> to have some human readable string associated with the global result >> partitions). >> - Client issues REST call to list / release global partitions. >> >> A few thoughts / questions below: >> 1. Failure cases: >> * The TEs may remove the result partition while the RM does not >> know. In this case, the client will receive a runtime error and submit the >> full DAG to recompute the missing result partition. In this case, RM should >> release the incomplete global partition. How would RM be notified to do >> that? >> * Is it possible the RM looses global partition metadata while >> the TE still host the data? For example, RM deletes the global partition >> entry while the release partition call to TE failed. >> * What would happen if the JM fails before the global partitions >> are registered to RM? Are users exposed to resource leak if JM does not >> have HA? >> * What would happen if the RM fails? Will TE release the >> partitions by themselves? >> >> 2. It looks that TE should be the source of truth of the result partition >> existence. Does it have to distinguish between global and local result >> partitions? If TE does not need to distinguish them, it seems the the >> releasePartition() method in TE could just provide the list of partitions >> to release, without the partitions to promote. >> >> 3. In the current design, RM should be able to release result >> partitions using ShuffleService. Will RM do this by sending RPC to the TEs? >> Or will the RM do it by itself? >> >> 4. How do we plan to handle the case when there are different shuffle >> services in the same Flink cluster? For example, a shared standalone >> cluster. >> >> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >> reason we use `:` instead? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Sep 17, 2019 at 3:22 AM zhijiang >> <[hidden email]> wrote: >> >>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my >>> side. >>> >>> I also have some similar concerns which Till already proposed before. >>> >>> 1. The consistent terminology in different components. On JM side, >>> PartitionTracker#getPersistedBlockingPartitions is defined for getting >>> global partitions. And on RM side, we define the method of >>> #registerGlobalPartitions correspondingly for handover the partitions from >>> JM. I think it is better to unify the term in different components for for >>> better understanding the semantic. Concering whether to use global or >>> persistent, I prefer the "global" term personally. Because it describes the >>> scope of partition clearly, and the "persistent" is more like the partition >>> storing way or implementation detail. In other words, the global partition >>> might also be cached in memory of TE, not must persist into files from >>> semantic requirements. Whether memory or persistent file is just the >>> implementation choice. >>> >>> 2. On TE side, we might rename the method #releasePartitions to >>> #releaseOrPromotePartitions which describes the function precisely and >>> keeps consistent with >>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >>> >>> 3. Very agree with Till's suggestions of global PartitionTable on TE side >>> and sticking to TE's heartbeat report to RM for global partitions. >>> >>> 4. Considering ShuffleMaster, it was built inside JM and expected to >>> interactive with JM before. Now the RM also needs to interactive with >>> ShuffleMaster to release global partitions. Then it might be better to move >>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be >>> consistent with RM. >>> >>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >>> partitions for successful jobs" >>> >>> Best, >>> Zhijiang >>> >>> >>> ------------------------------------------------------------------ >>> From:Till Rohrmann <[hidden email]> >>> Send Time:2019年9月10日(星期二) 10:10 >>> To:dev <[hidden email]> >>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >>> >>> Thanks Chesnay for drafting the FLIP and starting this discussion. >>> >>> I have a couple of comments: >>> >>> * I know that I've also coined the terms global/local result partition but >>> maybe it is not the perfect name. Maybe we could rethink the terminology >>> and call them persistent result partitions? >>> * Nit: I would call the last parameter of void releasePartitions(JobID >>> jobId, Collection<ResultPartitionID> partitionsToRelease, >>> Collection<ResultPartitionID> partitionsToPromote) either >>> partitionsToRetain or partitionsToPersistent. >>> * I'm not sure whether partitionsToRelease should contain a >>> global/persistent result partition id. I always thought that the user will >>> be responsible for managing the lifecycle of a global/persistent >>> result partition. >>> * Instead of extending the PartitionTable to be able to store >>> global/persistent and local/transient result partitions, I would rather >>> introduce a global PartitionTable to store the global/persistent result >>> partitions explicitly. I think there is a benefit in making things as >>> explicit as possible. >>> * The handover logic between the JM and the RM for the global/persistent >>> result partitions seems a bit brittle to me. What will happen if the JM >>> cannot reach the RM? I think it would be better if the TM announces the >>> global/persistent result partitions to the RM via its heartbeats. That way >>> we don't rely on an established connection between the JM and RM and we >>> keep the TM as the ground of truth. Moreover, the RM should simply forward >>> the release calls to the TM without much internal logic. >>> >>> Cheers, >>> Till >>> >>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> >>> wrote: >>> >>>> Hello, >>>> >>>> FLIP-36 (interactive programming) >>>> < >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>>> >>>> proposes a new programming paradigm where jobs are built incrementally >>>> by the user. >>>> >>>> To support this in an efficient manner I propose to extend partition >>>> life-cycle to support the notion of /global partitions/, which are >>>> partitions that can exist beyond the life-time of a job. >>>> >>>> These partitions could then be re-used by subsequent jobs in a fairly >>>> efficient manner, as they don't have to persisted to an external storage >>>> first and consuming tasks could be scheduled to exploit data-locality. >>>> >>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>>> and ResourceManager to support this from a life-cycle perspective. >>>> >>>> This FLIP does /not/ concern itself with the /usage/ of global >>>> partitions, including client-side APIs, job-submission, scheduling and >>>> reading said partitions; these are all follow-ups that will either be >>>> part of FLIP-36 or spliced out into separate FLIPs. >>>> >>>> >>> |
Thanks for addressing our comments Chesnay. See some comments inline.
On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <[hidden email]> wrote: > Thank you for your comments; I've aggregated them a bit and added > comments to each of them. > > 1) Concept name (proposal: persistent) > > I agree that "global" is rather undescriptive, particularly so since we > never had a notion of "local" partitions. > I'm not a fan of "persistent"; as to me this always implies reliable > long-term storage which as I understand we aren't shooting for here. > > I was thinking of "cached" partitions. > > To Zhijiangs point, we should of course make the naming consistent > everywhere. > > 2) Naming of last parameter of TE#releasePartitions (proposal: > partitionsToRetain / partitionsToPersistent) > > I can see where you're coming from ("promote" is somewhat abstract), but > I think both suggestions have downsides. > > "partitionsToPersistent" to me implies an additional write operation to > somewhere, but we aren't doing that. > "partitionsToRetain" kind of results in a redundancy with the other > argument since retaining is the opposite to releasing a partition; if I > want to retain a partition, why am I not just excluding it from the set > to release? > > I quite like "promote" personally; we fundamentally change how the > lifecycle for these partitions work, and introducing new keywords isn't > a inherently a bad choice. > > 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; > Note: addition of "OrPromote" is dependent on 2) ) > > Good point. > > 4) /Till: I'm not sure whether partitionsToRelease should contain a// > //global/persistent result partition id. I always thought that the user > will// > //be responsible for managing the lifecycle of a global/persistent// > //result partition./ > > @Till Please elaborate; which method/argument are you referring to? > In the FLIP you wrote "The set of partitions to release may contain local and/or global partitions; the promotion set must only refer to local partitions." to describe the `releasePartitions`. I think the JM should never be in the situation to release a global partition. Moreover, I believe we should have a separate RPC to release global result partitions which might come from the RM. > > 4)/Dedicated PartitionTable for global partitions/ > > Since there is only one RM for each TE a PartitionTable is unnecessary; > a simple set will suffice. > Alternatively, we could introduce such a dedicated set into the > PartitionTable to keep these data-structures close. > > 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs > retain global partitions for successful jobs"/ > > Will fix it. > > 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and > expected to interactive with JM before. Now the RM also needs to > interactive with ShuffleMaster to release global partitions. Then it > might be better to move ShuffleMaster outside of JM, and the lifecycle > of ShuffleMaster should be consistent with RM./ > > Yes, I alluded to this in the FLIP but should've been more explicit; the > shuffle master must outlive the JM. This is somewhat tricky when > considering the future a bit; if we assume that different jobs or even a > single one can use different shuffle services, then we need a way to > associate the partitions with the corresponding shuffle master. This > will likely require the introduction of a ShuffleMasterID that is > included in the ShuffleDescriptor. > > 7) Handover > > /Till: The handover logic between the JM and the RM for the > global/persistent// > //result partitions seems a bit brittle to me. What will happen if the JM// > //cannot reach the RM? I think it would be better if the TM announces the// > //global/persistent result partitions to the RM via its heartbeats. That > way// > //we don't rely on an established connection between the JM and RM and we// > //keep the TM as the ground of truth. Moreover, the RM should simply > forward// > //the release calls to the TM without much internal logic./ > > As for your question, if the JM cannot reach the RM the handover will > fail, the JM will likely shutdown without promoting any partition and > the TE will release all partitions. > What is the defined behavior for the JM in case of the RM disconnect > after a job has finished? Does it always/sometimes/never shutdown > with/-out communicating the result to the client / updating HA data; > or simply put, does the JM behave to the user as if nothing has happened > in all cases? > Once the JM has obtained the required slots to run a job, it no longer needs to communicate with the RM. Hence, a lost RM connection won't interfere with the job. I would like to keep it like this by letting the TE announce global result partitions to the RM and not to introduce another communication roundtrip. > > A heartbeat-based approach is useful and can alleviate some failure > cases (see below); but we need to make sure we don't exceed the akka > framesize or otherwise interfere with the heartbeat mechanism (like we > did with metrics in the past). Ideally we would only submit updates to > the partition set (added/removed partitions), but I'm not sure if the > heartbeats are reliable enough for this to work. > How big do you expect the payload to become? > > 8. Failure cases: > /Becket:/ > /a) The TEs may remove the result partition while the RM does not// > //know. In this case, the client will receive a runtime error and submit > the// > //full DAG to recompute the missing result partition. In this case, RM > should// > //release the incomplete global partition. How would RM be notified to do// > //that?// > //b) Is it possible the RM looses global partition metadata while// > //the TE still host the data? For example, RM deletes the global > partition// > //entry while the release partition call to TE failed.// > //c) What would happen if the JM fails before the global partitions// > //are registered to RM? Are users exposed to resource leak if JM does not// > //have HA?// > //d) What would happen if the RM fails? Will TE release the// > //partitions by themselves?/ > > 1.a) This is a good question that I haven't considered. This will likely > require a heartbeat-like report of available partitions. > The hearbeat based synchronization approach seems to crystalize as the way to go forward with this FLIP. > 1.b) RM should only delete entries if it received an ack from the TE; > otherwise we could easily end up leaking partitions. I believe I forgot > writing this down. > 1.c) As described in the FLIP the handoff to the RM must occur before > partitions are promoted. > If the JM fails during the handoff then the TE will cleanup all > partitions since it lost the connection to the JM, and partitions > weren't promoted yet. > If the JM fails after the handoff but before the promotion, same as > above. The RM would contain invalid entries in this case; see 1.a) . > If the JM fails after the handoff and promotion partitions we don't > leak anything since the RM is now fully responsible. > 1.d) yes; if the connection to the RM is disrupted the TE will cleanup > all global partitions, similar to how it cleans up all partitions > associated with a given job if the connection to the corresponding JM is > disrupted. > > 9. /Becket: It looks that TE should be the source of truth of the result > partition// > //existence. Does it have to distinguish between global and local result// > //partitions? If TE does not need to distinguish them, it seems the the// > //releasePartition() method in TE could just provide the list of > partitions// > //to release, without the partitions to promote./ > > The promotion is a hard requirement, as this is the signal to the TE > that this partition is no longer bound to the life-cycle of a job. > Without the promotion the TE would delete the partition once the JM has > shutdown; this is a safety net to ensure cleanup of partitions in case > of a disconnect. > > 10. /In the current design, RM should be able to release result// > //partitions using ShuffleService. Will RM do this by sending RPC to the > TEs?// > //Or will the RM do it by itself?/ > > The RM will send a release call to each TM and issue a release call to > the ShuffleMaster, just like the JobMaster handles partition releases. > > 11. /Becket: How do we plan to handle the case when there are different > shuffle// > //services in the same Flink cluster? For example, a shared standalone// > //cluster./ > > This case is not considered; there are so many changes necessary in > other parts of the runtime that we would jump the gun in addressing it > here. > Ultimately though, I would think that that the addition of a shuffle > master instance ID and shuffle service identifier should suffice. > > The identifier is used in subsequent jobs to load the appropriate > shuffle service for a given partition (think of it like a class name), > while the shuffle master instance ID is used to differentiate between > the different shuffle master instances running in the cluster (which > partitions have to be associated with so we can issue the correct > release calls). > > 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is > there a// > //reason we use `:` instead?/ > > That's netty syntax for path parameters. > > On 30/09/2019 08:34, Becket Qin wrote: > > Forgot to say that I agree with Till that it seems a good idea to let TEs > > register the global partitions to the RM instead of letting JM do it. > This > > simplifies quite a few things. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> > wrote: > > > >> Hi Chesnay, > >> > >> Thanks for the proposal. My understanding of the entire workflow step by > >> step is following: > >> > >> - JM maintains the local and global partition metadata when the task > >> runs to create result partitions. The tasks themselves does not > distinguish > >> between local / global partitions. Only the JM knows that. > >> - JM releases the local partitions as the job executes. When a job > >> finishes successfully, JM registers the global partitions to the RM. The > >> global partition IDs are set on the client instead of randomly > generated, > >> so the client can release global partitions using them. (It would be > good > >> to have some human readable string associated with the global result > >> partitions). > >> - Client issues REST call to list / release global partitions. > >> > >> A few thoughts / questions below: > >> 1. Failure cases: > >> * The TEs may remove the result partition while the RM does > not > >> know. In this case, the client will receive a runtime error and submit > the > >> full DAG to recompute the missing result partition. In this case, RM > should > >> release the incomplete global partition. How would RM be notified to do > >> that? > >> * Is it possible the RM looses global partition metadata > while > >> the TE still host the data? For example, RM deletes the global partition > >> entry while the release partition call to TE failed. > >> * What would happen if the JM fails before the global > partitions > >> are registered to RM? Are users exposed to resource leak if JM does not > >> have HA? > >> * What would happen if the RM fails? Will TE release the > >> partitions by themselves? > >> > >> 2. It looks that TE should be the source of truth of the result > partition > >> existence. Does it have to distinguish between global and local result > >> partitions? If TE does not need to distinguish them, it seems the the > >> releasePartition() method in TE could just provide the list of > partitions > >> to release, without the partitions to promote. > >> > >> 3. In the current design, RM should be able to release result > >> partitions using ShuffleService. Will RM do this by sending RPC to the > TEs? > >> Or will the RM do it by itself? > >> > >> 4. How do we plan to handle the case when there are different shuffle > >> services in the same Flink cluster? For example, a shared standalone > >> cluster. > >> > >> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > >> reason we use `:` instead? > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Tue, Sep 17, 2019 at 3:22 AM zhijiang > >> <[hidden email]> wrote: > >> > >>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on > my > >>> side. > >>> > >>> I also have some similar concerns which Till already proposed before. > >>> > >>> 1. The consistent terminology in different components. On JM side, > >>> PartitionTracker#getPersistedBlockingPartitions is defined for getting > >>> global partitions. And on RM side, we define the method of > >>> #registerGlobalPartitions correspondingly for handover the partitions > from > >>> JM. I think it is better to unify the term in different components for > for > >>> better understanding the semantic. Concering whether to use global or > >>> persistent, I prefer the "global" term personally. Because it > describes the > >>> scope of partition clearly, and the "persistent" is more like the > partition > >>> storing way or implementation detail. In other words, the global > partition > >>> might also be cached in memory of TE, not must persist into files from > >>> semantic requirements. Whether memory or persistent file is just the > >>> implementation choice. > >>> > >>> 2. On TE side, we might rename the method #releasePartitions to > >>> #releaseOrPromotePartitions which describes the function precisely and > >>> keeps consistent with > >>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > >>> > >>> 3. Very agree with Till's suggestions of global PartitionTable on TE > side > >>> and sticking to TE's heartbeat report to RM for global partitions. > >>> > >>> 4. Considering ShuffleMaster, it was built inside JM and expected to > >>> interactive with JM before. Now the RM also needs to interactive with > >>> ShuffleMaster to release global partitions. Then it might be better to > move > >>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should > be > >>> consistent with RM. > >>> > >>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > >>> partitions for successful jobs" > >>> > >>> Best, > >>> Zhijiang > >>> > >>> > >>> ------------------------------------------------------------------ > >>> From:Till Rohrmann <[hidden email]> > >>> Send Time:2019年9月10日(星期二) 10:10 > >>> To:dev <[hidden email]> > >>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > >>> > >>> Thanks Chesnay for drafting the FLIP and starting this discussion. > >>> > >>> I have a couple of comments: > >>> > >>> * I know that I've also coined the terms global/local result partition > but > >>> maybe it is not the perfect name. Maybe we could rethink the > terminology > >>> and call them persistent result partitions? > >>> * Nit: I would call the last parameter of void releasePartitions(JobID > >>> jobId, Collection<ResultPartitionID> partitionsToRelease, > >>> Collection<ResultPartitionID> partitionsToPromote) either > >>> partitionsToRetain or partitionsToPersistent. > >>> * I'm not sure whether partitionsToRelease should contain a > >>> global/persistent result partition id. I always thought that the user > will > >>> be responsible for managing the lifecycle of a global/persistent > >>> result partition. > >>> * Instead of extending the PartitionTable to be able to store > >>> global/persistent and local/transient result partitions, I would rather > >>> introduce a global PartitionTable to store the global/persistent result > >>> partitions explicitly. I think there is a benefit in making things as > >>> explicit as possible. > >>> * The handover logic between the JM and the RM for the > global/persistent > >>> result partitions seems a bit brittle to me. What will happen if the JM > >>> cannot reach the RM? I think it would be better if the TM announces the > >>> global/persistent result partitions to the RM via its heartbeats. That > way > >>> we don't rely on an established connection between the JM and RM and we > >>> keep the TM as the ground of truth. Moreover, the RM should simply > forward > >>> the release calls to the TM without much internal logic. > >>> > >>> Cheers, > >>> Till > >>> > >>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> > >>> wrote: > >>> > >>>> Hello, > >>>> > >>>> FLIP-36 (interactive programming) > >>>> < > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > >>>> > >>>> proposes a new programming paradigm where jobs are built incrementally > >>>> by the user. > >>>> > >>>> To support this in an efficient manner I propose to extend partition > >>>> life-cycle to support the notion of /global partitions/, which are > >>>> partitions that can exist beyond the life-time of a job. > >>>> > >>>> These partitions could then be re-used by subsequent jobs in a fairly > >>>> efficient manner, as they don't have to persisted to an external > storage > >>>> first and consuming tasks could be scheduled to exploit data-locality. > >>>> > >>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor > >>>> and ResourceManager to support this from a life-cycle perspective. > >>>> > >>>> This FLIP does /not/ concern itself with the /usage/ of global > >>>> partitions, including client-side APIs, job-submission, scheduling and > >>>> reading said partitions; these are all follow-ups that will either be > >>>> part of FLIP-36 or spliced out into separate FLIPs. > >>>> > >>>> > >>> > > |
/Till: In the FLIP you wrote "The set of partitions to release may
contain local and/or global partitions; the promotion set must only refer to local partitions." to describe the `releasePartitions`. I think the JM should never be in the situation to release a global partition. Moreover, I believe we should have a separate RPC to release global result partitions which might come from the RM./ We can certainly add a separate RPC method for explicitly releasing global partitions. You are correct that the JM should not be able to release those, just like the RM should not be able to release non-global partitions. /Till: Once the JM has obtained the required slots to run a job, it no longer needs to communicate with the RM. Hence, a lost RM connection won't interfere with the job. I would like to keep it like this by letting the TE announce global result partitions to the RM and not to introduce another communication roundtrip. /Agreed, this is a nice property to retain. /Till: How big do you expect the payload to become? /I don't know, which is precisely why I want to be cautious about it. The last time I made a similar assumption I didn't expect anyone to have hundreds of thousands of metrics on a single TM, which turned out to be wrong. I wouldn't exclude the possibility of a similar number of partitions being hosted on a single TE. One problem we have to solve with the heartbeat-based approach is that partitions may be lost without the TE noticing, due to disk-failures or external delete operations. Currently, for scheduling purposes we rely on information stored in the JM, and update said information if a job fails due to a missing partition. However, IIRC the JM is informed about with an exception that is thrown by the consumer of said partition, not the producer. As far as the producing TM is concerned, it is still hosting that partition. This means we have to forward errors for missing partitions from the network stack on the producers side to the TE, so that it can inform the RM about it. // On 02/10/2019 16:21, Till Rohrmann wrote: > Thanks for addressing our comments Chesnay. See some comments inline. > > On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <[hidden email]> wrote: > >> Thank you for your comments; I've aggregated them a bit and added >> comments to each of them. >> >> 1) Concept name (proposal: persistent) >> >> I agree that "global" is rather undescriptive, particularly so since we >> never had a notion of "local" partitions. >> I'm not a fan of "persistent"; as to me this always implies reliable >> long-term storage which as I understand we aren't shooting for here. >> >> I was thinking of "cached" partitions. >> >> To Zhijiangs point, we should of course make the naming consistent >> everywhere. >> >> 2) Naming of last parameter of TE#releasePartitions (proposal: >> partitionsToRetain / partitionsToPersistent) >> >> I can see where you're coming from ("promote" is somewhat abstract), but >> I think both suggestions have downsides. >> >> "partitionsToPersistent" to me implies an additional write operation to >> somewhere, but we aren't doing that. >> "partitionsToRetain" kind of results in a redundancy with the other >> argument since retaining is the opposite to releasing a partition; if I >> want to retain a partition, why am I not just excluding it from the set >> to release? >> >> I quite like "promote" personally; we fundamentally change how the >> lifecycle for these partitions work, and introducing new keywords isn't >> a inherently a bad choice. >> >> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; >> Note: addition of "OrPromote" is dependent on 2) ) >> >> Good point. >> >> 4) /Till: I'm not sure whether partitionsToRelease should contain a// >> //global/persistent result partition id. I always thought that the user >> will// >> //be responsible for managing the lifecycle of a global/persistent// >> //result partition./ >> >> @Till Please elaborate; which method/argument are you referring to? >> > In the FLIP you wrote "The set of partitions to release may contain local > and/or global partitions; the promotion set must only refer to local > partitions." to describe the `releasePartitions`. I think the JM should > never be in the situation to release a global partition. Moreover, I > believe we should have a separate RPC to release global result partitions > which might come from the RM. > >> 4)/Dedicated PartitionTable for global partitions/ >> >> Since there is only one RM for each TE a PartitionTable is unnecessary; >> a simple set will suffice. >> Alternatively, we could introduce such a dedicated set into the >> PartitionTable to keep these data-structures close. >> >> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs >> retain global partitions for successful jobs"/ >> >> Will fix it. >> >> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and >> expected to interactive with JM before. Now the RM also needs to >> interactive with ShuffleMaster to release global partitions. Then it >> might be better to move ShuffleMaster outside of JM, and the lifecycle >> of ShuffleMaster should be consistent with RM./ >> >> Yes, I alluded to this in the FLIP but should've been more explicit; the >> shuffle master must outlive the JM. This is somewhat tricky when >> considering the future a bit; if we assume that different jobs or even a >> single one can use different shuffle services, then we need a way to >> associate the partitions with the corresponding shuffle master. This >> will likely require the introduction of a ShuffleMasterID that is >> included in the ShuffleDescriptor. >> >> 7) Handover >> >> /Till: The handover logic between the JM and the RM for the >> global/persistent// >> //result partitions seems a bit brittle to me. What will happen if the JM// >> //cannot reach the RM? I think it would be better if the TM announces the// >> //global/persistent result partitions to the RM via its heartbeats. That >> way// >> //we don't rely on an established connection between the JM and RM and we// >> //keep the TM as the ground of truth. Moreover, the RM should simply >> forward// >> //the release calls to the TM without much internal logic./ >> >> As for your question, if the JM cannot reach the RM the handover will >> fail, the JM will likely shutdown without promoting any partition and >> the TE will release all partitions. >> What is the defined behavior for the JM in case of the RM disconnect >> after a job has finished? Does it always/sometimes/never shutdown >> with/-out communicating the result to the client / updating HA data; >> or simply put, does the JM behave to the user as if nothing has happened >> in all cases? >> > Once the JM has obtained the required slots to run a job, it no longer > needs to communicate with the RM. Hence, a lost RM connection won't > interfere with the job. I would like to keep it like this by letting the TE > announce global result partitions to the RM and not to introduce another > communication roundtrip. > >> A heartbeat-based approach is useful and can alleviate some failure >> cases (see below); but we need to make sure we don't exceed the akka >> framesize or otherwise interfere with the heartbeat mechanism (like we >> did with metrics in the past). Ideally we would only submit updates to >> the partition set (added/removed partitions), but I'm not sure if the >> heartbeats are reliable enough for this to work. >> > How big do you expect the payload to become? > >> 8. Failure cases: >> /Becket:/ >> /a) The TEs may remove the result partition while the RM does not// >> //know. In this case, the client will receive a runtime error and submit >> the// >> //full DAG to recompute the missing result partition. In this case, RM >> should// >> //release the incomplete global partition. How would RM be notified to do// >> //that?// >> //b) Is it possible the RM looses global partition metadata while// >> //the TE still host the data? For example, RM deletes the global >> partition// >> //entry while the release partition call to TE failed.// >> //c) What would happen if the JM fails before the global partitions// >> //are registered to RM? Are users exposed to resource leak if JM does not// >> //have HA?// >> //d) What would happen if the RM fails? Will TE release the// >> //partitions by themselves?/ >> >> 1.a) This is a good question that I haven't considered. This will likely >> require a heartbeat-like report of available partitions. >> > The hearbeat based synchronization approach seems to crystalize as the way > to go forward with this FLIP. > > >> 1.b) RM should only delete entries if it received an ack from the TE; >> otherwise we could easily end up leaking partitions. I believe I forgot >> writing this down. >> 1.c) As described in the FLIP the handoff to the RM must occur before >> partitions are promoted. >> If the JM fails during the handoff then the TE will cleanup all >> partitions since it lost the connection to the JM, and partitions >> weren't promoted yet. >> If the JM fails after the handoff but before the promotion, same as >> above. The RM would contain invalid entries in this case; see 1.a) . >> If the JM fails after the handoff and promotion partitions we don't >> leak anything since the RM is now fully responsible. >> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup >> all global partitions, similar to how it cleans up all partitions >> associated with a given job if the connection to the corresponding JM is >> disrupted. >> >> 9. /Becket: It looks that TE should be the source of truth of the result >> partition// >> //existence. Does it have to distinguish between global and local result// >> //partitions? If TE does not need to distinguish them, it seems the the// >> //releasePartition() method in TE could just provide the list of >> partitions// >> //to release, without the partitions to promote./ >> >> The promotion is a hard requirement, as this is the signal to the TE >> that this partition is no longer bound to the life-cycle of a job. >> Without the promotion the TE would delete the partition once the JM has >> shutdown; this is a safety net to ensure cleanup of partitions in case >> of a disconnect. >> >> 10. /In the current design, RM should be able to release result// >> //partitions using ShuffleService. Will RM do this by sending RPC to the >> TEs?// >> //Or will the RM do it by itself?/ >> >> The RM will send a release call to each TM and issue a release call to >> the ShuffleMaster, just like the JobMaster handles partition releases. >> >> 11. /Becket: How do we plan to handle the case when there are different >> shuffle// >> //services in the same Flink cluster? For example, a shared standalone// >> //cluster./ >> >> This case is not considered; there are so many changes necessary in >> other parts of the runtime that we would jump the gun in addressing it >> here. >> Ultimately though, I would think that that the addition of a shuffle >> master instance ID and shuffle service identifier should suffice. >> >> The identifier is used in subsequent jobs to load the appropriate >> shuffle service for a given partition (think of it like a class name), >> while the shuffle master instance ID is used to differentiate between >> the different shuffle master instances running in the cluster (which >> partitions have to be associated with so we can issue the correct >> release calls). >> >> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is >> there a// >> //reason we use `:` instead?/ >> >> That's netty syntax for path parameters. >> >> On 30/09/2019 08:34, Becket Qin wrote: >>> Forgot to say that I agree with Till that it seems a good idea to let TEs >>> register the global partitions to the RM instead of letting JM do it. >> This >>> simplifies quite a few things. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> >> wrote: >>>> Hi Chesnay, >>>> >>>> Thanks for the proposal. My understanding of the entire workflow step by >>>> step is following: >>>> >>>> - JM maintains the local and global partition metadata when the task >>>> runs to create result partitions. The tasks themselves does not >> distinguish >>>> between local / global partitions. Only the JM knows that. >>>> - JM releases the local partitions as the job executes. When a job >>>> finishes successfully, JM registers the global partitions to the RM. The >>>> global partition IDs are set on the client instead of randomly >> generated, >>>> so the client can release global partitions using them. (It would be >> good >>>> to have some human readable string associated with the global result >>>> partitions). >>>> - Client issues REST call to list / release global partitions. >>>> >>>> A few thoughts / questions below: >>>> 1. Failure cases: >>>> * The TEs may remove the result partition while the RM does >> not >>>> know. In this case, the client will receive a runtime error and submit >> the >>>> full DAG to recompute the missing result partition. In this case, RM >> should >>>> release the incomplete global partition. How would RM be notified to do >>>> that? >>>> * Is it possible the RM looses global partition metadata >> while >>>> the TE still host the data? For example, RM deletes the global partition >>>> entry while the release partition call to TE failed. >>>> * What would happen if the JM fails before the global >> partitions >>>> are registered to RM? Are users exposed to resource leak if JM does not >>>> have HA? >>>> * What would happen if the RM fails? Will TE release the >>>> partitions by themselves? >>>> >>>> 2. It looks that TE should be the source of truth of the result >> partition >>>> existence. Does it have to distinguish between global and local result >>>> partitions? If TE does not need to distinguish them, it seems the the >>>> releasePartition() method in TE could just provide the list of >> partitions >>>> to release, without the partitions to promote. >>>> >>>> 3. In the current design, RM should be able to release result >>>> partitions using ShuffleService. Will RM do this by sending RPC to the >> TEs? >>>> Or will the RM do it by itself? >>>> >>>> 4. How do we plan to handle the case when there are different shuffle >>>> services in the same Flink cluster? For example, a shared standalone >>>> cluster. >>>> >>>> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >>>> reason we use `:` instead? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Tue, Sep 17, 2019 at 3:22 AM zhijiang >>>> <[hidden email]> wrote: >>>> >>>>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on >> my >>>>> side. >>>>> >>>>> I also have some similar concerns which Till already proposed before. >>>>> >>>>> 1. The consistent terminology in different components. On JM side, >>>>> PartitionTracker#getPersistedBlockingPartitions is defined for getting >>>>> global partitions. And on RM side, we define the method of >>>>> #registerGlobalPartitions correspondingly for handover the partitions >> from >>>>> JM. I think it is better to unify the term in different components for >> for >>>>> better understanding the semantic. Concering whether to use global or >>>>> persistent, I prefer the "global" term personally. Because it >> describes the >>>>> scope of partition clearly, and the "persistent" is more like the >> partition >>>>> storing way or implementation detail. In other words, the global >> partition >>>>> might also be cached in memory of TE, not must persist into files from >>>>> semantic requirements. Whether memory or persistent file is just the >>>>> implementation choice. >>>>> >>>>> 2. On TE side, we might rename the method #releasePartitions to >>>>> #releaseOrPromotePartitions which describes the function precisely and >>>>> keeps consistent with >>>>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >>>>> >>>>> 3. Very agree with Till's suggestions of global PartitionTable on TE >> side >>>>> and sticking to TE's heartbeat report to RM for global partitions. >>>>> >>>>> 4. Considering ShuffleMaster, it was built inside JM and expected to >>>>> interactive with JM before. Now the RM also needs to interactive with >>>>> ShuffleMaster to release global partitions. Then it might be better to >> move >>>>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should >> be >>>>> consistent with RM. >>>>> >>>>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >>>>> partitions for successful jobs" >>>>> >>>>> Best, >>>>> Zhijiang >>>>> >>>>> >>>>> ------------------------------------------------------------------ >>>>> From:Till Rohrmann <[hidden email]> >>>>> Send Time:2019年9月10日(星期二) 10:10 >>>>> To:dev <[hidden email]> >>>>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >>>>> >>>>> Thanks Chesnay for drafting the FLIP and starting this discussion. >>>>> >>>>> I have a couple of comments: >>>>> >>>>> * I know that I've also coined the terms global/local result partition >> but >>>>> maybe it is not the perfect name. Maybe we could rethink the >> terminology >>>>> and call them persistent result partitions? >>>>> * Nit: I would call the last parameter of void releasePartitions(JobID >>>>> jobId, Collection<ResultPartitionID> partitionsToRelease, >>>>> Collection<ResultPartitionID> partitionsToPromote) either >>>>> partitionsToRetain or partitionsToPersistent. >>>>> * I'm not sure whether partitionsToRelease should contain a >>>>> global/persistent result partition id. I always thought that the user >> will >>>>> be responsible for managing the lifecycle of a global/persistent >>>>> result partition. >>>>> * Instead of extending the PartitionTable to be able to store >>>>> global/persistent and local/transient result partitions, I would rather >>>>> introduce a global PartitionTable to store the global/persistent result >>>>> partitions explicitly. I think there is a benefit in making things as >>>>> explicit as possible. >>>>> * The handover logic between the JM and the RM for the >> global/persistent >>>>> result partitions seems a bit brittle to me. What will happen if the JM >>>>> cannot reach the RM? I think it would be better if the TM announces the >>>>> global/persistent result partitions to the RM via its heartbeats. That >> way >>>>> we don't rely on an established connection between the JM and RM and we >>>>> keep the TM as the ground of truth. Moreover, the RM should simply >> forward >>>>> the release calls to the TM without much internal logic. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> FLIP-36 (interactive programming) >>>>>> < >>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>>>>> proposes a new programming paradigm where jobs are built incrementally >>>>>> by the user. >>>>>> >>>>>> To support this in an efficient manner I propose to extend partition >>>>>> life-cycle to support the notion of /global partitions/, which are >>>>>> partitions that can exist beyond the life-time of a job. >>>>>> >>>>>> These partitions could then be re-used by subsequent jobs in a fairly >>>>>> efficient manner, as they don't have to persisted to an external >> storage >>>>>> first and consuming tasks could be scheduled to exploit data-locality. >>>>>> >>>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>>>>> and ResourceManager to support this from a life-cycle perspective. >>>>>> >>>>>> This FLIP does /not/ concern itself with the /usage/ of global >>>>>> partitions, including client-side APIs, job-submission, scheduling and >>>>>> reading said partitions; these are all follow-ups that will either be >>>>>> part of FLIP-36 or spliced out into separate FLIPs. >>>>>> >>>>>> >> |
On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <[hidden email]> wrote:
> *Till: In the FLIP you wrote "The set of partitions to release may contain local > and/or global partitions; the promotion set must only refer to local > partitions." to describe the `releasePartitions`. I think the JM should > never be in the situation to release a global partition. Moreover, I > believe we should have a separate RPC to release global result partitions > which might come from the RM.* > > We can certainly add a separate RPC method for explicitly releasing global partitions. > You are correct that the JM should not be able to release those, just like the RM should not be able to release non-global partitions. > *Till: Once the JM has obtained the required slots to run a job, it no longer > needs to communicate with the RM. Hence, a lost RM connection won't > interfere with the job. I would like to keep it like this by letting the TE > announce global result partitions to the RM and not to introduce another > communication roundtrip. > > *Agreed, this is a nice property to retain. > *Till: How big do you expect the payload to become? > > *I don't know, which is precisely why I want to be cautious about it. > The last time I made a similar assumption I didn't expect anyone to have hundreds of thousands of metrics on a single TM, which turned out to be wrong. > I wouldn't exclude the possibility of a similar number of partitions being hosted on a single TE. > > > One problem we have to solve with the heartbeat-based approach is that partitions may be lost without the TE noticing, due to disk-failures or external delete operations. > Currently, for scheduling purposes we rely on information stored in the JM, and update said information if a job fails due to a missing partition. However, IIRC the JM is informed about with an exception that is thrown by the consumer of said partition, not the producer. As far as the producing TM is concerned, it is still hosting that partition. > This means we have to forward errors for missing partitions from the network stack on the producers side to the TE, so that it can inform the RM about it. > > the local result partitions. Cheers, Till > On 02/10/2019 16:21, Till Rohrmann wrote: > > Thanks for addressing our comments Chesnay. See some comments inline. > > On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <[hidden email]> <[hidden email]> wrote: > > > Thank you for your comments; I've aggregated them a bit and added > comments to each of them. > > 1) Concept name (proposal: persistent) > > I agree that "global" is rather undescriptive, particularly so since we > never had a notion of "local" partitions. > I'm not a fan of "persistent"; as to me this always implies reliable > long-term storage which as I understand we aren't shooting for here. > > I was thinking of "cached" partitions. > > To Zhijiangs point, we should of course make the naming consistent > everywhere. > > 2) Naming of last parameter of TE#releasePartitions (proposal: > partitionsToRetain / partitionsToPersistent) > > I can see where you're coming from ("promote" is somewhat abstract), but > I think both suggestions have downsides. > > "partitionsToPersistent" to me implies an additional write operation to > somewhere, but we aren't doing that. > "partitionsToRetain" kind of results in a redundancy with the other > argument since retaining is the opposite to releasing a partition; if I > want to retain a partition, why am I not just excluding it from the set > to release? > > I quite like "promote" personally; we fundamentally change how the > lifecycle for these partitions work, and introducing new keywords isn't > a inherently a bad choice. > > 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; > Note: addition of "OrPromote" is dependent on 2) ) > > Good point. > > 4) /Till: I'm not sure whether partitionsToRelease should contain a// > //global/persistent result partition id. I always thought that the user > will// > //be responsible for managing the lifecycle of a global/persistent// > //result partition./ > > @Till Please elaborate; which method/argument are you referring to? > > > In the FLIP you wrote "The set of partitions to release may contain local > and/or global partitions; the promotion set must only refer to local > partitions." to describe the `releasePartitions`. I think the JM should > never be in the situation to release a global partition. Moreover, I > believe we should have a separate RPC to release global result partitions > which might come from the RM. > > > 4)/Dedicated PartitionTable for global partitions/ > > Since there is only one RM for each TE a PartitionTable is unnecessary; > a simple set will suffice. > Alternatively, we could introduce such a dedicated set into the > PartitionTable to keep these data-structures close. > > 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs > retain global partitions for successful jobs"/ > > Will fix it. > > 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and > expected to interactive with JM before. Now the RM also needs to > interactive with ShuffleMaster to release global partitions. Then it > might be better to move ShuffleMaster outside of JM, and the lifecycle > of ShuffleMaster should be consistent with RM./ > > Yes, I alluded to this in the FLIP but should've been more explicit; the > shuffle master must outlive the JM. This is somewhat tricky when > considering the future a bit; if we assume that different jobs or even a > single one can use different shuffle services, then we need a way to > associate the partitions with the corresponding shuffle master. This > will likely require the introduction of a ShuffleMasterID that is > included in the ShuffleDescriptor. > > 7) Handover > > /Till: The handover logic between the JM and the RM for the > global/persistent// > //result partitions seems a bit brittle to me. What will happen if the JM// > //cannot reach the RM? I think it would be better if the TM announces the// > //global/persistent result partitions to the RM via its heartbeats. That > way// > //we don't rely on an established connection between the JM and RM and we// > //keep the TM as the ground of truth. Moreover, the RM should simply > forward// > //the release calls to the TM without much internal logic./ > > As for your question, if the JM cannot reach the RM the handover will > fail, the JM will likely shutdown without promoting any partition and > the TE will release all partitions. > What is the defined behavior for the JM in case of the RM disconnect > after a job has finished? Does it always/sometimes/never shutdown > with/-out communicating the result to the client / updating HA data; > or simply put, does the JM behave to the user as if nothing has happened > in all cases? > > > Once the JM has obtained the required slots to run a job, it no longer > needs to communicate with the RM. Hence, a lost RM connection won't > interfere with the job. I would like to keep it like this by letting the TE > announce global result partitions to the RM and not to introduce another > communication roundtrip. > > > A heartbeat-based approach is useful and can alleviate some failure > cases (see below); but we need to make sure we don't exceed the akka > framesize or otherwise interfere with the heartbeat mechanism (like we > did with metrics in the past). Ideally we would only submit updates to > the partition set (added/removed partitions), but I'm not sure if the > heartbeats are reliable enough for this to work. > > > How big do you expect the payload to become? > > > 8. Failure cases: > /Becket:/ > /a) The TEs may remove the result partition while the RM does not// > //know. In this case, the client will receive a runtime error and submit > the// > //full DAG to recompute the missing result partition. In this case, RM > should// > //release the incomplete global partition. How would RM be notified to do// > //that?// > //b) Is it possible the RM looses global partition metadata while// > //the TE still host the data? For example, RM deletes the global > partition// > //entry while the release partition call to TE failed.// > //c) What would happen if the JM fails before the global partitions// > //are registered to RM? Are users exposed to resource leak if JM does not// > //have HA?// > //d) What would happen if the RM fails? Will TE release the// > //partitions by themselves?/ > > 1.a) This is a good question that I haven't considered. This will likely > require a heartbeat-like report of available partitions. > > > The hearbeat based synchronization approach seems to crystalize as the way > to go forward with this FLIP. > > > > 1.b) RM should only delete entries if it received an ack from the TE; > otherwise we could easily end up leaking partitions. I believe I forgot > writing this down. > 1.c) As described in the FLIP the handoff to the RM must occur before > partitions are promoted. > If the JM fails during the handoff then the TE will cleanup all > partitions since it lost the connection to the JM, and partitions > weren't promoted yet. > If the JM fails after the handoff but before the promotion, same as > above. The RM would contain invalid entries in this case; see 1.a) . > If the JM fails after the handoff and promotion partitions we don't > leak anything since the RM is now fully responsible. > 1.d) yes; if the connection to the RM is disrupted the TE will cleanup > all global partitions, similar to how it cleans up all partitions > associated with a given job if the connection to the corresponding JM is > disrupted. > > 9. /Becket: It looks that TE should be the source of truth of the result > partition// > //existence. Does it have to distinguish between global and local result// > //partitions? If TE does not need to distinguish them, it seems the the// > //releasePartition() method in TE could just provide the list of > partitions// > //to release, without the partitions to promote./ > > The promotion is a hard requirement, as this is the signal to the TE > that this partition is no longer bound to the life-cycle of a job. > Without the promotion the TE would delete the partition once the JM has > shutdown; this is a safety net to ensure cleanup of partitions in case > of a disconnect. > > 10. /In the current design, RM should be able to release result// > //partitions using ShuffleService. Will RM do this by sending RPC to the > TEs?// > //Or will the RM do it by itself?/ > > The RM will send a release call to each TM and issue a release call to > the ShuffleMaster, just like the JobMaster handles partition releases. > > 11. /Becket: How do we plan to handle the case when there are different > shuffle// > //services in the same Flink cluster? For example, a shared standalone// > //cluster./ > > This case is not considered; there are so many changes necessary in > other parts of the runtime that we would jump the gun in addressing it > here. > Ultimately though, I would think that that the addition of a shuffle > master instance ID and shuffle service identifier should suffice. > > The identifier is used in subsequent jobs to load the appropriate > shuffle service for a given partition (think of it like a class name), > while the shuffle master instance ID is used to differentiate between > the different shuffle master instances running in the cluster (which > partitions have to be associated with so we can issue the correct > release calls). > > 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is > there a// > //reason we use `:` instead?/ > > That's netty syntax for path parameters. > > On 30/09/2019 08:34, Becket Qin wrote: > > Forgot to say that I agree with Till that it seems a good idea to let TEs > register the global partitions to the RM instead of letting JM do it. > > This > > simplifies quite a few things. > > Thanks, > > Jiangjie (Becket) Qin > > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> <[hidden email]> > > wrote: > > Hi Chesnay, > > Thanks for the proposal. My understanding of the entire workflow step by > step is following: > > - JM maintains the local and global partition metadata when the task > runs to create result partitions. The tasks themselves does not > > distinguish > > between local / global partitions. Only the JM knows that. > - JM releases the local partitions as the job executes. When a job > finishes successfully, JM registers the global partitions to the RM. The > global partition IDs are set on the client instead of randomly > > generated, > > so the client can release global partitions using them. (It would be > > good > > to have some human readable string associated with the global result > partitions). > - Client issues REST call to list / release global partitions. > > A few thoughts / questions below: > 1. Failure cases: > * The TEs may remove the result partition while the RM does > > not > > know. In this case, the client will receive a runtime error and submit > > the > > full DAG to recompute the missing result partition. In this case, RM > > should > > release the incomplete global partition. How would RM be notified to do > that? > * Is it possible the RM looses global partition metadata > > while > > the TE still host the data? For example, RM deletes the global partition > entry while the release partition call to TE failed. > * What would happen if the JM fails before the global > > partitions > > are registered to RM? Are users exposed to resource leak if JM does not > have HA? > * What would happen if the RM fails? Will TE release the > partitions by themselves? > > 2. It looks that TE should be the source of truth of the result > > partition > > existence. Does it have to distinguish between global and local result > partitions? If TE does not need to distinguish them, it seems the the > releasePartition() method in TE could just provide the list of > > partitions > > to release, without the partitions to promote. > > 3. In the current design, RM should be able to release result > partitions using ShuffleService. Will RM do this by sending RPC to the > > TEs? > > Or will the RM do it by itself? > > 4. How do we plan to handle the case when there are different shuffle > services in the same Flink cluster? For example, a shared standalone > cluster. > > 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > reason we use `:` instead? > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Sep 17, 2019 at 3:22 AM zhijiang<[hidden email]> <[hidden email]> wrote: > > > Thanks Chesnay for this FLIP and sorry for touching it a bit delay on > > my > > side. > > I also have some similar concerns which Till already proposed before. > > 1. The consistent terminology in different components. On JM side, > PartitionTracker#getPersistedBlockingPartitions is defined for getting > global partitions. And on RM side, we define the method of > #registerGlobalPartitions correspondingly for handover the partitions > > from > > JM. I think it is better to unify the term in different components for > > for > > better understanding the semantic. Concering whether to use global or > persistent, I prefer the "global" term personally. Because it > > describes the > > scope of partition clearly, and the "persistent" is more like the > > partition > > storing way or implementation detail. In other words, the global > > partition > > might also be cached in memory of TE, not must persist into files from > semantic requirements. Whether memory or persistent file is just the > implementation choice. > > 2. On TE side, we might rename the method #releasePartitions to > #releaseOrPromotePartitions which describes the function precisely and > keeps consistent with > PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > > 3. Very agree with Till's suggestions of global PartitionTable on TE > > side > > and sticking to TE's heartbeat report to RM for global partitions. > > 4. Considering ShuffleMaster, it was built inside JM and expected to > interactive with JM before. Now the RM also needs to interactive with > ShuffleMaster to release global partitions. Then it might be better to > > move > > ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should > > be > > consistent with RM. > > 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > partitions for successful jobs" > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Till Rohrmann <[hidden email]> <[hidden email]> > Send Time:2019年9月10日(星期二) 10:10 > To:dev <[hidden email]> <[hidden email]> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > Thanks Chesnay for drafting the FLIP and starting this discussion. > > I have a couple of comments: > > * I know that I've also coined the terms global/local result partition > > but > > maybe it is not the perfect name. Maybe we could rethink the > > terminology > > and call them persistent result partitions? > * Nit: I would call the last parameter of void releasePartitions(JobID > jobId, Collection<ResultPartitionID> partitionsToRelease, > Collection<ResultPartitionID> partitionsToPromote) either > partitionsToRetain or partitionsToPersistent. > * I'm not sure whether partitionsToRelease should contain a > global/persistent result partition id. I always thought that the user > > will > > be responsible for managing the lifecycle of a global/persistent > result partition. > * Instead of extending the PartitionTable to be able to store > global/persistent and local/transient result partitions, I would rather > introduce a global PartitionTable to store the global/persistent result > partitions explicitly. I think there is a benefit in making things as > explicit as possible. > * The handover logic between the JM and the RM for the > > global/persistent > > result partitions seems a bit brittle to me. What will happen if the JM > cannot reach the RM? I think it would be better if the TM announces the > global/persistent result partitions to the RM via its heartbeats. That > > way > > we don't rely on an established connection between the JM and RM and we > keep the TM as the ground of truth. Moreover, the RM should simply > > forward > > the release calls to the TM without much internal logic. > > Cheers, > Till > > On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> <[hidden email]> > wrote: > > > Hello, > > FLIP-36 (interactive programming) > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > proposes a new programming paradigm where jobs are built incrementally > by the user. > > To support this in an efficient manner I propose to extend partition > life-cycle to support the notion of /global partitions/, which are > partitions that can exist beyond the life-time of a job. > > These partitions could then be re-used by subsequent jobs in a fairly > efficient manner, as they don't have to persisted to an external > > storage > > first and consuming tasks could be scheduled to exploit data-locality. > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > and ResourceManager to support this from a life-cycle perspective. > > This FLIP does /not/ concern itself with the /usage/ of global > partitions, including client-side APIs, job-submission, scheduling and > reading said partitions; these are all follow-ups that will either be > part of FLIP-36 or spliced out into separate FLIPs. > > > > > |
I have updated the FLIP.
- consistently use "local"/"global" terminology; this incidentally should make it easier to update the terminology if we decide on other names - inform RM via heartbeats from TE about available global partitions - add dedicated method for releasing global partitions - add dedicated section for required changes to the ShuffleMaster (mostly clarification) - added some items to the "Rejected Alternatives" section - updated discussion link While writing the ShuffleMaster section I noticed the following: If, at any point, the JM/RM are moved into dedicated processes we either a) have multiple ShuffleMaster instances for the same shuffle service active b) require a single ShuffleMaster on the RM, to which JM calls are being forwarded. Neither of these are without pain-points; a) introduces additional constraints on ShuffleMaster implementations in that no local state must be kept b) again forces the JM to regularly be in touch with the RM, and limits the ShuffleMaster interface to being RPC-friendly. I'm wondering whether this issue was already an anyone's radar. On 04/10/2019 14:12, Till Rohrmann wrote: > > > On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <[hidden email] > <mailto:[hidden email]>> wrote: > > /Till: In the FLIP you wrote "The set of partitions to release may > contain local and/or global partitions; the promotion set must > only refer to local partitions." to describe the > `releasePartitions`. I think the JM should never be in the > situation to release a global partition. Moreover, I believe we > should have a separate RPC to release global result partitions > which might come from the RM./ > > We can certainly add a separate RPC method for explicitly releasing global partitions. > You are correct that the JM should not be able to release those, just like the RM should not be able to release non-global partitions. > > /Till: Once the JM has obtained the required slots to run a job, > it no longer needs to communicate with the RM. Hence, a lost RM > connection won't interfere with the job. I would like to keep it > like this by letting the TE announce global result partitions to > the RM and not to introduce another communication roundtrip. /Agreed, this is a nice property to retain. > > /Till: How big do you expect the payload to become? /I don't know, which is precisely why I want to be cautious about it. > The last time I made a similar assumption I didn't expect anyone to have hundreds of thousands of metrics on a single TM, which turned out to be wrong. > I wouldn't exclude the possibility of a similar number of partitions being hosted on a single TE. > > > One problem we have to solve with the heartbeat-based approach is that partitions may be lost without the TE noticing, due to disk-failures or external delete operations. > Currently, for scheduling purposes we rely on information stored in the JM, and update said information if a job fails due to a missing partition. However, IIRC the JM is informed about with an exception that is thrown by the consumer of said partition, not the producer. As far as the producing TM is concerned, it is still hosting that partition. > This means we have to forward errors for missing partitions from the network stack on the producers side to the TE, so that it can inform the RM about it. > > > Yes, I think you are right Chesnay. This would also be a good addition > for the local result partitions. > > Cheers, > Till > > On 02/10/2019 16:21, Till Rohrmann wrote: >> Thanks for addressing our comments Chesnay. See some comments inline. >> >> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler<[hidden email]> <mailto:[hidden email]> wrote: >> >>> Thank you for your comments; I've aggregated them a bit and added >>> comments to each of them. >>> >>> 1) Concept name (proposal: persistent) >>> >>> I agree that "global" is rather undescriptive, particularly so since we >>> never had a notion of "local" partitions. >>> I'm not a fan of "persistent"; as to me this always implies reliable >>> long-term storage which as I understand we aren't shooting for here. >>> >>> I was thinking of "cached" partitions. >>> >>> To Zhijiangs point, we should of course make the naming consistent >>> everywhere. >>> >>> 2) Naming of last parameter of TE#releasePartitions (proposal: >>> partitionsToRetain / partitionsToPersistent) >>> >>> I can see where you're coming from ("promote" is somewhat abstract), but >>> I think both suggestions have downsides. >>> >>> "partitionsToPersistent" to me implies an additional write operation to >>> somewhere, but we aren't doing that. >>> "partitionsToRetain" kind of results in a redundancy with the other >>> argument since retaining is the opposite to releasing a partition; if I >>> want to retain a partition, why am I not just excluding it from the set >>> to release? >>> >>> I quite like "promote" personally; we fundamentally change how the >>> lifecycle for these partitions work, and introducing new keywords isn't >>> a inherently a bad choice. >>> >>> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; >>> Note: addition of "OrPromote" is dependent on 2) ) >>> >>> Good point. >>> >>> 4) /Till: I'm not sure whether partitionsToRelease should contain a// >>> //global/persistent result partition id. I always thought that the user >>> will// >>> //be responsible for managing the lifecycle of a global/persistent// >>> //result partition./ >>> >>> @Till Please elaborate; which method/argument are you referring to? >>> >> In the FLIP you wrote "The set of partitions to release may contain local >> and/or global partitions; the promotion set must only refer to local >> partitions." to describe the `releasePartitions`. I think the JM should >> never be in the situation to release a global partition. Moreover, I >> believe we should have a separate RPC to release global result partitions >> which might come from the RM. >> >>> 4)/Dedicated PartitionTable for global partitions/ >>> >>> Since there is only one RM for each TE a PartitionTable is unnecessary; >>> a simple set will suffice. >>> Alternatively, we could introduce such a dedicated set into the >>> PartitionTable to keep these data-structures close. >>> >>> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs >>> retain global partitions for successful jobs"/ >>> >>> Will fix it. >>> >>> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and >>> expected to interactive with JM before. Now the RM also needs to >>> interactive with ShuffleMaster to release global partitions. Then it >>> might be better to move ShuffleMaster outside of JM, and the lifecycle >>> of ShuffleMaster should be consistent with RM./ >>> >>> Yes, I alluded to this in the FLIP but should've been more explicit; the >>> shuffle master must outlive the JM. This is somewhat tricky when >>> considering the future a bit; if we assume that different jobs or even a >>> single one can use different shuffle services, then we need a way to >>> associate the partitions with the corresponding shuffle master. This >>> will likely require the introduction of a ShuffleMasterID that is >>> included in the ShuffleDescriptor. >>> >>> 7) Handover >>> >>> /Till: The handover logic between the JM and the RM for the >>> global/persistent// >>> //result partitions seems a bit brittle to me. What will happen if the JM// >>> //cannot reach the RM? I think it would be better if the TM announces the// >>> //global/persistent result partitions to the RM via its heartbeats. That >>> way// >>> //we don't rely on an established connection between the JM and RM and we// >>> //keep the TM as the ground of truth. Moreover, the RM should simply >>> forward// >>> //the release calls to the TM without much internal logic./ >>> >>> As for your question, if the JM cannot reach the RM the handover will >>> fail, the JM will likely shutdown without promoting any partition and >>> the TE will release all partitions. >>> What is the defined behavior for the JM in case of the RM disconnect >>> after a job has finished? Does it always/sometimes/never shutdown >>> with/-out communicating the result to the client / updating HA data; >>> or simply put, does the JM behave to the user as if nothing has happened >>> in all cases? >>> >> Once the JM has obtained the required slots to run a job, it no longer >> needs to communicate with the RM. Hence, a lost RM connection won't >> interfere with the job. I would like to keep it like this by letting the TE >> announce global result partitions to the RM and not to introduce another >> communication roundtrip. >> >>> A heartbeat-based approach is useful and can alleviate some failure >>> cases (see below); but we need to make sure we don't exceed the akka >>> framesize or otherwise interfere with the heartbeat mechanism (like we >>> did with metrics in the past). Ideally we would only submit updates to >>> the partition set (added/removed partitions), but I'm not sure if the >>> heartbeats are reliable enough for this to work. >>> >> How big do you expect the payload to become? >> >>> 8. Failure cases: >>> /Becket:/ >>> /a) The TEs may remove the result partition while the RM does not// >>> //know. In this case, the client will receive a runtime error and submit >>> the// >>> //full DAG to recompute the missing result partition. In this case, RM >>> should// >>> //release the incomplete global partition. How would RM be notified to do// >>> //that?// >>> //b) Is it possible the RM looses global partition metadata while// >>> //the TE still host the data? For example, RM deletes the global >>> partition// >>> //entry while the release partition call to TE failed.// >>> //c) What would happen if the JM fails before the global partitions// >>> //are registered to RM? Are users exposed to resource leak if JM does not// >>> //have HA?// >>> //d) What would happen if the RM fails? Will TE release the// >>> //partitions by themselves?/ >>> >>> 1.a) This is a good question that I haven't considered. This will likely >>> require a heartbeat-like report of available partitions. >>> >> The hearbeat based synchronization approach seems to crystalize as the way >> to go forward with this FLIP. >> >> >>> 1.b) RM should only delete entries if it received an ack from the TE; >>> otherwise we could easily end up leaking partitions. I believe I forgot >>> writing this down. >>> 1.c) As described in the FLIP the handoff to the RM must occur before >>> partitions are promoted. >>> If the JM fails during the handoff then the TE will cleanup all >>> partitions since it lost the connection to the JM, and partitions >>> weren't promoted yet. >>> If the JM fails after the handoff but before the promotion, same as >>> above. The RM would contain invalid entries in this case; see 1.a) . >>> If the JM fails after the handoff and promotion partitions we don't >>> leak anything since the RM is now fully responsible. >>> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup >>> all global partitions, similar to how it cleans up all partitions >>> associated with a given job if the connection to the corresponding JM is >>> disrupted. >>> >>> 9. /Becket: It looks that TE should be the source of truth of the result >>> partition// >>> //existence. Does it have to distinguish between global and local result// >>> //partitions? If TE does not need to distinguish them, it seems the the// >>> //releasePartition() method in TE could just provide the list of >>> partitions// >>> //to release, without the partitions to promote./ >>> >>> The promotion is a hard requirement, as this is the signal to the TE >>> that this partition is no longer bound to the life-cycle of a job. >>> Without the promotion the TE would delete the partition once the JM has >>> shutdown; this is a safety net to ensure cleanup of partitions in case >>> of a disconnect. >>> >>> 10. /In the current design, RM should be able to release result// >>> //partitions using ShuffleService. Will RM do this by sending RPC to the >>> TEs?// >>> //Or will the RM do it by itself?/ >>> >>> The RM will send a release call to each TM and issue a release call to >>> the ShuffleMaster, just like the JobMaster handles partition releases. >>> >>> 11. /Becket: How do we plan to handle the case when there are different >>> shuffle// >>> //services in the same Flink cluster? For example, a shared standalone// >>> //cluster./ >>> >>> This case is not considered; there are so many changes necessary in >>> other parts of the runtime that we would jump the gun in addressing it >>> here. >>> Ultimately though, I would think that that the addition of a shuffle >>> master instance ID and shuffle service identifier should suffice. >>> >>> The identifier is used in subsequent jobs to load the appropriate >>> shuffle service for a given partition (think of it like a class name), >>> while the shuffle master instance ID is used to differentiate between >>> the different shuffle master instances running in the cluster (which >>> partitions have to be associated with so we can issue the correct >>> release calls). >>> >>> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is >>> there a// >>> //reason we use `:` instead?/ >>> >>> That's netty syntax for path parameters. >>> >>> On 30/09/2019 08:34, Becket Qin wrote: >>>> Forgot to say that I agree with Till that it seems a good idea to let TEs >>>> register the global partitions to the RM instead of letting JM do it. >>> This >>>> simplifies quite a few things. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin<[hidden email]> <mailto:[hidden email]> >>> wrote: >>>>> Hi Chesnay, >>>>> >>>>> Thanks for the proposal. My understanding of the entire workflow step by >>>>> step is following: >>>>> >>>>> - JM maintains the local and global partition metadata when the task >>>>> runs to create result partitions. The tasks themselves does not >>> distinguish >>>>> between local / global partitions. Only the JM knows that. >>>>> - JM releases the local partitions as the job executes. When a job >>>>> finishes successfully, JM registers the global partitions to the RM. The >>>>> global partition IDs are set on the client instead of randomly >>> generated, >>>>> so the client can release global partitions using them. (It would be >>> good >>>>> to have some human readable string associated with the global result >>>>> partitions). >>>>> - Client issues REST call to list / release global partitions. >>>>> >>>>> A few thoughts / questions below: >>>>> 1. Failure cases: >>>>> * The TEs may remove the result partition while the RM does >>> not >>>>> know. In this case, the client will receive a runtime error and submit >>> the >>>>> full DAG to recompute the missing result partition. In this case, RM >>> should >>>>> release the incomplete global partition. How would RM be notified to do >>>>> that? >>>>> * Is it possible the RM looses global partition metadata >>> while >>>>> the TE still host the data? For example, RM deletes the global partition >>>>> entry while the release partition call to TE failed. >>>>> * What would happen if the JM fails before the global >>> partitions >>>>> are registered to RM? Are users exposed to resource leak if JM does not >>>>> have HA? >>>>> * What would happen if the RM fails? Will TE release the >>>>> partitions by themselves? >>>>> >>>>> 2. It looks that TE should be the source of truth of the result >>> partition >>>>> existence. Does it have to distinguish between global and local result >>>>> partitions? If TE does not need to distinguish them, it seems the the >>>>> releasePartition() method in TE could just provide the list of >>> partitions >>>>> to release, without the partitions to promote. >>>>> >>>>> 3. In the current design, RM should be able to release result >>>>> partitions using ShuffleService. Will RM do this by sending RPC to the >>> TEs? >>>>> Or will the RM do it by itself? >>>>> >>>>> 4. How do we plan to handle the case when there are different shuffle >>>>> services in the same Flink cluster? For example, a shared standalone >>>>> cluster. >>>>> >>>>> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >>>>> reason we use `:` instead? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Tue, Sep 17, 2019 at 3:22 AM zhijiang >>>>> <[hidden email]> <mailto:[hidden email]> wrote: >>>>> >>>>>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on >>> my >>>>>> side. >>>>>> >>>>>> I also have some similar concerns which Till already proposed before. >>>>>> >>>>>> 1. The consistent terminology in different components. On JM side, >>>>>> PartitionTracker#getPersistedBlockingPartitions is defined for getting >>>>>> global partitions. And on RM side, we define the method of >>>>>> #registerGlobalPartitions correspondingly for handover the partitions >>> from >>>>>> JM. I think it is better to unify the term in different components for >>> for >>>>>> better understanding the semantic. Concering whether to use global or >>>>>> persistent, I prefer the "global" term personally. Because it >>> describes the >>>>>> scope of partition clearly, and the "persistent" is more like the >>> partition >>>>>> storing way or implementation detail. In other words, the global >>> partition >>>>>> might also be cached in memory of TE, not must persist into files from >>>>>> semantic requirements. Whether memory or persistent file is just the >>>>>> implementation choice. >>>>>> >>>>>> 2. On TE side, we might rename the method #releasePartitions to >>>>>> #releaseOrPromotePartitions which describes the function precisely and >>>>>> keeps consistent with >>>>>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >>>>>> >>>>>> 3. Very agree with Till's suggestions of global PartitionTable on TE >>> side >>>>>> and sticking to TE's heartbeat report to RM for global partitions. >>>>>> >>>>>> 4. Considering ShuffleMaster, it was built inside JM and expected to >>>>>> interactive with JM before. Now the RM also needs to interactive with >>>>>> ShuffleMaster to release global partitions. Then it might be better to >>> move >>>>>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should >>> be >>>>>> consistent with RM. >>>>>> >>>>>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >>>>>> partitions for successful jobs" >>>>>> >>>>>> Best, >>>>>> Zhijiang >>>>>> >>>>>> >>>>>> ------------------------------------------------------------------ >>>>>> From:Till Rohrmann<[hidden email]> <mailto:[hidden email]> >>>>>> Send Time:2019年9月10日(星期二) 10:10 >>>>>> To:dev<[hidden email]> <mailto:[hidden email]> >>>>>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >>>>>> >>>>>> Thanks Chesnay for drafting the FLIP and starting this discussion. >>>>>> >>>>>> I have a couple of comments: >>>>>> >>>>>> * I know that I've also coined the terms global/local result partition >>> but >>>>>> maybe it is not the perfect name. Maybe we could rethink the >>> terminology >>>>>> and call them persistent result partitions? >>>>>> * Nit: I would call the last parameter of void releasePartitions(JobID >>>>>> jobId, Collection<ResultPartitionID> partitionsToRelease, >>>>>> Collection<ResultPartitionID> partitionsToPromote) either >>>>>> partitionsToRetain or partitionsToPersistent. >>>>>> * I'm not sure whether partitionsToRelease should contain a >>>>>> global/persistent result partition id. I always thought that the user >>> will >>>>>> be responsible for managing the lifecycle of a global/persistent >>>>>> result partition. >>>>>> * Instead of extending the PartitionTable to be able to store >>>>>> global/persistent and local/transient result partitions, I would rather >>>>>> introduce a global PartitionTable to store the global/persistent result >>>>>> partitions explicitly. I think there is a benefit in making things as >>>>>> explicit as possible. >>>>>> * The handover logic between the JM and the RM for the >>> global/persistent >>>>>> result partitions seems a bit brittle to me. What will happen if the JM >>>>>> cannot reach the RM? I think it would be better if the TM announces the >>>>>> global/persistent result partitions to the RM via its heartbeats. That >>> way >>>>>> we don't rely on an established connection between the JM and RM and we >>>>>> keep the TM as the ground of truth. Moreover, the RM should simply >>> forward >>>>>> the release calls to the TM without much internal logic. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler<[hidden email]> <mailto:[hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> FLIP-36 (interactive programming) >>>>>>> < >>>>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>>>>>> proposes a new programming paradigm where jobs are built incrementally >>>>>>> by the user. >>>>>>> >>>>>>> To support this in an efficient manner I propose to extend partition >>>>>>> life-cycle to support the notion of /global partitions/, which are >>>>>>> partitions that can exist beyond the life-time of a job. >>>>>>> >>>>>>> These partitions could then be re-used by subsequent jobs in a fairly >>>>>>> efficient manner, as they don't have to persisted to an external >>> storage >>>>>>> first and consuming tasks could be scheduled to exploit data-locality. >>>>>>> >>>>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>>>>>> and ResourceManager to support this from a life-cycle perspective. >>>>>>> >>>>>>> This FLIP does /not/ concern itself with the /usage/ of global >>>>>>> partitions, including client-side APIs, job-submission, scheduling and >>>>>>> reading said partitions; these are all follow-ups that will either be >>>>>>> part of FLIP-36 or spliced out into separate FLIPs. >>>>>>> >>>>>>> > |
Thanks for updating the FLIP.
I think the RM does not need to have access to a full fledged ShuffleMaster implementation. Instead it should enough to give it a leaner interface which only supports to delete result partitions and list available global partitions. This might entail that one will have a ShuffleMaster implementation running on the Dispatcher and a GlobalResultPartitionsShuffleMaster implementation running on the RM. Long story short, if we separate the RM from the Dispatcher, then this might entail that we will have two ShuffleMaster incarnations running in each process. Cheers, Till On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler <[hidden email]> wrote: > I have updated the FLIP. > > - consistently use "local"/"global" terminology; this incidentally should > make it easier to update the terminology if we decide on other names > - inform RM via heartbeats from TE about available global partitions > - add dedicated method for releasing global partitions > - add dedicated section for required changes to the ShuffleMaster (mostly > clarification) > - added some items to the "Rejected Alternatives" section > - updated discussion link > > > While writing the ShuffleMaster section I noticed the following: > > If, at any point, the JM/RM are moved into dedicated processes we either > a) have multiple ShuffleMaster instances for the same shuffle service > active > b) require a single ShuffleMaster on the RM, to which JM calls are being > forwarded. > > Neither of these are without pain-points; > a) introduces additional constraints on ShuffleMaster implementations in > that no local state must be kept > b) again forces the JM to regularly be in touch with the RM, and limits > the ShuffleMaster interface to being RPC-friendly. > > I'm wondering whether this issue was already an anyone's radar. > > > On 04/10/2019 14:12, Till Rohrmann wrote: > > > > On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <[hidden email]> > wrote: > >> *Till: In the FLIP you wrote "The set of partitions to release may contain local >> and/or global partitions; the promotion set must only refer to local >> partitions." to describe the `releasePartitions`. I think the JM should >> never be in the situation to release a global partition. Moreover, I >> believe we should have a separate RPC to release global result partitions >> which might come from the RM.* >> >> We can certainly add a separate RPC method for explicitly releasing global partitions. >> You are correct that the JM should not be able to release those, just like the RM should not be able to release non-global partitions. >> *Till: Once the JM has obtained the required slots to run a job, it no longer >> needs to communicate with the RM. Hence, a lost RM connection won't >> interfere with the job. I would like to keep it like this by letting the TE >> announce global result partitions to the RM and not to introduce another >> communication roundtrip. >> >> *Agreed, this is a nice property to retain. >> *Till: How big do you expect the payload to become? >> >> *I don't know, which is precisely why I want to be cautious about it. >> The last time I made a similar assumption I didn't expect anyone to have hundreds of thousands of metrics on a single TM, which turned out to be wrong. >> I wouldn't exclude the possibility of a similar number of partitions being hosted on a single TE. >> >> >> One problem we have to solve with the heartbeat-based approach is that partitions may be lost without the TE noticing, due to disk-failures or external delete operations. >> Currently, for scheduling purposes we rely on information stored in the JM, and update said information if a job fails due to a missing partition. However, IIRC the JM is informed about with an exception that is thrown by the consumer of said partition, not the producer. As far as the producing TM is concerned, it is still hosting that partition. >> This means we have to forward errors for missing partitions from the network stack on the producers side to the TE, so that it can inform the RM about it. >> >> > Yes, I think you are right Chesnay. This would also be a good addition for > the local result partitions. > > Cheers, > Till > >> On 02/10/2019 16:21, Till Rohrmann wrote: >> >> Thanks for addressing our comments Chesnay. See some comments inline. >> >> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <[hidden email]> <[hidden email]> wrote: >> >> >> Thank you for your comments; I've aggregated them a bit and added >> comments to each of them. >> >> 1) Concept name (proposal: persistent) >> >> I agree that "global" is rather undescriptive, particularly so since we >> never had a notion of "local" partitions. >> I'm not a fan of "persistent"; as to me this always implies reliable >> long-term storage which as I understand we aren't shooting for here. >> >> I was thinking of "cached" partitions. >> >> To Zhijiangs point, we should of course make the naming consistent >> everywhere. >> >> 2) Naming of last parameter of TE#releasePartitions (proposal: >> partitionsToRetain / partitionsToPersistent) >> >> I can see where you're coming from ("promote" is somewhat abstract), but >> I think both suggestions have downsides. >> >> "partitionsToPersistent" to me implies an additional write operation to >> somewhere, but we aren't doing that. >> "partitionsToRetain" kind of results in a redundancy with the other >> argument since retaining is the opposite to releasing a partition; if I >> want to retain a partition, why am I not just excluding it from the set >> to release? >> >> I quite like "promote" personally; we fundamentally change how the >> lifecycle for these partitions work, and introducing new keywords isn't >> a inherently a bad choice. >> >> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; >> Note: addition of "OrPromote" is dependent on 2) ) >> >> Good point. >> >> 4) /Till: I'm not sure whether partitionsToRelease should contain a// >> //global/persistent result partition id. I always thought that the user >> will// >> //be responsible for managing the lifecycle of a global/persistent// >> //result partition./ >> >> @Till Please elaborate; which method/argument are you referring to? >> >> >> In the FLIP you wrote "The set of partitions to release may contain local >> and/or global partitions; the promotion set must only refer to local >> partitions." to describe the `releasePartitions`. I think the JM should >> never be in the situation to release a global partition. Moreover, I >> believe we should have a separate RPC to release global result partitions >> which might come from the RM. >> >> >> 4)/Dedicated PartitionTable for global partitions/ >> >> Since there is only one RM for each TE a PartitionTable is unnecessary; >> a simple set will suffice. >> Alternatively, we could introduce such a dedicated set into the >> PartitionTable to keep these data-structures close. >> >> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs >> retain global partitions for successful jobs"/ >> >> Will fix it. >> >> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and >> expected to interactive with JM before. Now the RM also needs to >> interactive with ShuffleMaster to release global partitions. Then it >> might be better to move ShuffleMaster outside of JM, and the lifecycle >> of ShuffleMaster should be consistent with RM./ >> >> Yes, I alluded to this in the FLIP but should've been more explicit; the >> shuffle master must outlive the JM. This is somewhat tricky when >> considering the future a bit; if we assume that different jobs or even a >> single one can use different shuffle services, then we need a way to >> associate the partitions with the corresponding shuffle master. This >> will likely require the introduction of a ShuffleMasterID that is >> included in the ShuffleDescriptor. >> >> 7) Handover >> >> /Till: The handover logic between the JM and the RM for the >> global/persistent// >> //result partitions seems a bit brittle to me. What will happen if the JM// >> //cannot reach the RM? I think it would be better if the TM announces the// >> //global/persistent result partitions to the RM via its heartbeats. That >> way// >> //we don't rely on an established connection between the JM and RM and we// >> //keep the TM as the ground of truth. Moreover, the RM should simply >> forward// >> //the release calls to the TM without much internal logic./ >> >> As for your question, if the JM cannot reach the RM the handover will >> fail, the JM will likely shutdown without promoting any partition and >> the TE will release all partitions. >> What is the defined behavior for the JM in case of the RM disconnect >> after a job has finished? Does it always/sometimes/never shutdown >> with/-out communicating the result to the client / updating HA data; >> or simply put, does the JM behave to the user as if nothing has happened >> in all cases? >> >> >> Once the JM has obtained the required slots to run a job, it no longer >> needs to communicate with the RM. Hence, a lost RM connection won't >> interfere with the job. I would like to keep it like this by letting the TE >> announce global result partitions to the RM and not to introduce another >> communication roundtrip. >> >> >> A heartbeat-based approach is useful and can alleviate some failure >> cases (see below); but we need to make sure we don't exceed the akka >> framesize or otherwise interfere with the heartbeat mechanism (like we >> did with metrics in the past). Ideally we would only submit updates to >> the partition set (added/removed partitions), but I'm not sure if the >> heartbeats are reliable enough for this to work. >> >> >> How big do you expect the payload to become? >> >> >> 8. Failure cases: >> /Becket:/ >> /a) The TEs may remove the result partition while the RM does not// >> //know. In this case, the client will receive a runtime error and submit >> the// >> //full DAG to recompute the missing result partition. In this case, RM >> should// >> //release the incomplete global partition. How would RM be notified to do// >> //that?// >> //b) Is it possible the RM looses global partition metadata while// >> //the TE still host the data? For example, RM deletes the global >> partition// >> //entry while the release partition call to TE failed.// >> //c) What would happen if the JM fails before the global partitions// >> //are registered to RM? Are users exposed to resource leak if JM does not// >> //have HA?// >> //d) What would happen if the RM fails? Will TE release the// >> //partitions by themselves?/ >> >> 1.a) This is a good question that I haven't considered. This will likely >> require a heartbeat-like report of available partitions. >> >> >> The hearbeat based synchronization approach seems to crystalize as the way >> to go forward with this FLIP. >> >> >> >> 1.b) RM should only delete entries if it received an ack from the TE; >> otherwise we could easily end up leaking partitions. I believe I forgot >> writing this down. >> 1.c) As described in the FLIP the handoff to the RM must occur before >> partitions are promoted. >> If the JM fails during the handoff then the TE will cleanup all >> partitions since it lost the connection to the JM, and partitions >> weren't promoted yet. >> If the JM fails after the handoff but before the promotion, same as >> above. The RM would contain invalid entries in this case; see 1.a) . >> If the JM fails after the handoff and promotion partitions we don't >> leak anything since the RM is now fully responsible. >> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup >> all global partitions, similar to how it cleans up all partitions >> associated with a given job if the connection to the corresponding JM is >> disrupted. >> >> 9. /Becket: It looks that TE should be the source of truth of the result >> partition// >> //existence. Does it have to distinguish between global and local result// >> //partitions? If TE does not need to distinguish them, it seems the the// >> //releasePartition() method in TE could just provide the list of >> partitions// >> //to release, without the partitions to promote./ >> >> The promotion is a hard requirement, as this is the signal to the TE >> that this partition is no longer bound to the life-cycle of a job. >> Without the promotion the TE would delete the partition once the JM has >> shutdown; this is a safety net to ensure cleanup of partitions in case >> of a disconnect. >> >> 10. /In the current design, RM should be able to release result// >> //partitions using ShuffleService. Will RM do this by sending RPC to the >> TEs?// >> //Or will the RM do it by itself?/ >> >> The RM will send a release call to each TM and issue a release call to >> the ShuffleMaster, just like the JobMaster handles partition releases. >> >> 11. /Becket: How do we plan to handle the case when there are different >> shuffle// >> //services in the same Flink cluster? For example, a shared standalone// >> //cluster./ >> >> This case is not considered; there are so many changes necessary in >> other parts of the runtime that we would jump the gun in addressing it >> here. >> Ultimately though, I would think that that the addition of a shuffle >> master instance ID and shuffle service identifier should suffice. >> >> The identifier is used in subsequent jobs to load the appropriate >> shuffle service for a given partition (think of it like a class name), >> while the shuffle master instance ID is used to differentiate between >> the different shuffle master instances running in the cluster (which >> partitions have to be associated with so we can issue the correct >> release calls). >> >> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is >> there a// >> //reason we use `:` instead?/ >> >> That's netty syntax for path parameters. >> >> On 30/09/2019 08:34, Becket Qin wrote: >> >> Forgot to say that I agree with Till that it seems a good idea to let TEs >> register the global partitions to the RM instead of letting JM do it. >> >> This >> >> simplifies quite a few things. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> <[hidden email]> >> >> wrote: >> >> Hi Chesnay, >> >> Thanks for the proposal. My understanding of the entire workflow step by >> step is following: >> >> - JM maintains the local and global partition metadata when the task >> runs to create result partitions. The tasks themselves does not >> >> distinguish >> >> between local / global partitions. Only the JM knows that. >> - JM releases the local partitions as the job executes. When a job >> finishes successfully, JM registers the global partitions to the RM. The >> global partition IDs are set on the client instead of randomly >> >> generated, >> >> so the client can release global partitions using them. (It would be >> >> good >> >> to have some human readable string associated with the global result >> partitions). >> - Client issues REST call to list / release global partitions. >> >> A few thoughts / questions below: >> 1. Failure cases: >> * The TEs may remove the result partition while the RM does >> >> not >> >> know. In this case, the client will receive a runtime error and submit >> >> the >> >> full DAG to recompute the missing result partition. In this case, RM >> >> should >> >> release the incomplete global partition. How would RM be notified to do >> that? >> * Is it possible the RM looses global partition metadata >> >> while >> >> the TE still host the data? For example, RM deletes the global partition >> entry while the release partition call to TE failed. >> * What would happen if the JM fails before the global >> >> partitions >> >> are registered to RM? Are users exposed to resource leak if JM does not >> have HA? >> * What would happen if the RM fails? Will TE release the >> partitions by themselves? >> >> 2. It looks that TE should be the source of truth of the result >> >> partition >> >> existence. Does it have to distinguish between global and local result >> partitions? If TE does not need to distinguish them, it seems the the >> releasePartition() method in TE could just provide the list of >> >> partitions >> >> to release, without the partitions to promote. >> >> 3. In the current design, RM should be able to release result >> partitions using ShuffleService. Will RM do this by sending RPC to the >> >> TEs? >> >> Or will the RM do it by itself? >> >> 4. How do we plan to handle the case when there are different shuffle >> services in the same Flink cluster? For example, a shared standalone >> cluster. >> >> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >> reason we use `:` instead? >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Sep 17, 2019 at 3:22 AM zhijiang<[hidden email]> <[hidden email]> wrote: >> >> >> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on >> >> my >> >> side. >> >> I also have some similar concerns which Till already proposed before. >> >> 1. The consistent terminology in different components. On JM side, >> PartitionTracker#getPersistedBlockingPartitions is defined for getting >> global partitions. And on RM side, we define the method of >> #registerGlobalPartitions correspondingly for handover the partitions >> >> from >> >> JM. I think it is better to unify the term in different components for >> >> for >> >> better understanding the semantic. Concering whether to use global or >> persistent, I prefer the "global" term personally. Because it >> >> describes the >> >> scope of partition clearly, and the "persistent" is more like the >> >> partition >> >> storing way or implementation detail. In other words, the global >> >> partition >> >> might also be cached in memory of TE, not must persist into files from >> semantic requirements. Whether memory or persistent file is just the >> implementation choice. >> >> 2. On TE side, we might rename the method #releasePartitions to >> #releaseOrPromotePartitions which describes the function precisely and >> keeps consistent with >> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >> >> 3. Very agree with Till's suggestions of global PartitionTable on TE >> >> side >> >> and sticking to TE's heartbeat report to RM for global partitions. >> >> 4. Considering ShuffleMaster, it was built inside JM and expected to >> interactive with JM before. Now the RM also needs to interactive with >> ShuffleMaster to release global partitions. Then it might be better to >> >> move >> >> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should >> >> be >> >> consistent with RM. >> >> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >> partitions for successful jobs" >> >> Best, >> Zhijiang >> >> >> ------------------------------------------------------------------ >> From:Till Rohrmann <[hidden email]> <[hidden email]> >> Send Time:2019年9月10日(星期二) 10:10 >> To:dev <[hidden email]> <[hidden email]> >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >> >> Thanks Chesnay for drafting the FLIP and starting this discussion. >> >> I have a couple of comments: >> >> * I know that I've also coined the terms global/local result partition >> >> but >> >> maybe it is not the perfect name. Maybe we could rethink the >> >> terminology >> >> and call them persistent result partitions? >> * Nit: I would call the last parameter of void releasePartitions(JobID >> jobId, Collection<ResultPartitionID> partitionsToRelease, >> Collection<ResultPartitionID> partitionsToPromote) either >> partitionsToRetain or partitionsToPersistent. >> * I'm not sure whether partitionsToRelease should contain a >> global/persistent result partition id. I always thought that the user >> >> will >> >> be responsible for managing the lifecycle of a global/persistent >> result partition. >> * Instead of extending the PartitionTable to be able to store >> global/persistent and local/transient result partitions, I would rather >> introduce a global PartitionTable to store the global/persistent result >> partitions explicitly. I think there is a benefit in making things as >> explicit as possible. >> * The handover logic between the JM and the RM for the >> >> global/persistent >> >> result partitions seems a bit brittle to me. What will happen if the JM >> cannot reach the RM? I think it would be better if the TM announces the >> global/persistent result partitions to the RM via its heartbeats. That >> >> way >> >> we don't rely on an established connection between the JM and RM and we >> keep the TM as the ground of truth. Moreover, the RM should simply >> >> forward >> >> the release calls to the TM without much internal logic. >> >> Cheers, >> Till >> >> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> <[hidden email]> >> wrote: >> >> >> Hello, >> >> FLIP-36 (interactive programming) >> < >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >> >> proposes a new programming paradigm where jobs are built incrementally >> by the user. >> >> To support this in an efficient manner I propose to extend partition >> life-cycle to support the notion of /global partitions/, which are >> partitions that can exist beyond the life-time of a job. >> >> These partitions could then be re-used by subsequent jobs in a fairly >> efficient manner, as they don't have to persisted to an external >> >> storage >> >> first and consuming tasks could be scheduled to exploit data-locality. >> >> The FLIP outlines the required changes on the JobMaster, TaskExecutor >> and ResourceManager to support this from a life-cycle perspective. >> >> This FLIP does /not/ concern itself with the /usage/ of global >> partitions, including client-side APIs, job-submission, scheduling and >> reading said partitions; these are all follow-ups that will either be >> part of FLIP-36 or spliced out into separate FLIPs. >> >> >> >> >> > |
So should we enforce having 2 instances now or defer this to a later date?
I'd rather do this early since it changes 2 assumptions that ShuffleMaster can currently make: - every partition release is preceded by a registration of said partition - the release of partitions may rely on local data On 04/10/2019 17:10, Till Rohrmann wrote: > Thanks for updating the FLIP. > > I think the RM does not need to have access to a full fledged ShuffleMaster > implementation. Instead it should enough to give it a leaner interface > which only supports to delete result partitions and list available global > partitions. This might entail that one will have a ShuffleMaster > implementation running on the Dispatcher and a > GlobalResultPartitionsShuffleMaster implementation running on the RM. Long > story short, if we separate the RM from the Dispatcher, then this might > entail that we will have two ShuffleMaster incarnations running in each > process. > > Cheers, > Till > > On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler <[hidden email]> wrote: > >> I have updated the FLIP. >> >> - consistently use "local"/"global" terminology; this incidentally should >> make it easier to update the terminology if we decide on other names >> - inform RM via heartbeats from TE about available global partitions >> - add dedicated method for releasing global partitions >> - add dedicated section for required changes to the ShuffleMaster (mostly >> clarification) >> - added some items to the "Rejected Alternatives" section >> - updated discussion link >> >> >> While writing the ShuffleMaster section I noticed the following: >> >> If, at any point, the JM/RM are moved into dedicated processes we either >> a) have multiple ShuffleMaster instances for the same shuffle service >> active >> b) require a single ShuffleMaster on the RM, to which JM calls are being >> forwarded. >> >> Neither of these are without pain-points; >> a) introduces additional constraints on ShuffleMaster implementations in >> that no local state must be kept >> b) again forces the JM to regularly be in touch with the RM, and limits >> the ShuffleMaster interface to being RPC-friendly. >> >> I'm wondering whether this issue was already an anyone's radar. >> >> >> On 04/10/2019 14:12, Till Rohrmann wrote: >> >> >> >> On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <[hidden email]> >> wrote: >> >>> *Till: In the FLIP you wrote "The set of partitions to release may contain local >>> and/or global partitions; the promotion set must only refer to local >>> partitions." to describe the `releasePartitions`. I think the JM should >>> never be in the situation to release a global partition. Moreover, I >>> believe we should have a separate RPC to release global result partitions >>> which might come from the RM.* >>> >>> We can certainly add a separate RPC method for explicitly releasing global partitions. >>> You are correct that the JM should not be able to release those, just like the RM should not be able to release non-global partitions. >>> *Till: Once the JM has obtained the required slots to run a job, it no longer >>> needs to communicate with the RM. Hence, a lost RM connection won't >>> interfere with the job. I would like to keep it like this by letting the TE >>> announce global result partitions to the RM and not to introduce another >>> communication roundtrip. >>> >>> *Agreed, this is a nice property to retain. >>> *Till: How big do you expect the payload to become? >>> >>> *I don't know, which is precisely why I want to be cautious about it. >>> The last time I made a similar assumption I didn't expect anyone to have hundreds of thousands of metrics on a single TM, which turned out to be wrong. >>> I wouldn't exclude the possibility of a similar number of partitions being hosted on a single TE. >>> >>> >>> One problem we have to solve with the heartbeat-based approach is that partitions may be lost without the TE noticing, due to disk-failures or external delete operations. >>> Currently, for scheduling purposes we rely on information stored in the JM, and update said information if a job fails due to a missing partition. However, IIRC the JM is informed about with an exception that is thrown by the consumer of said partition, not the producer. As far as the producing TM is concerned, it is still hosting that partition. >>> This means we have to forward errors for missing partitions from the network stack on the producers side to the TE, so that it can inform the RM about it. >>> >>> >> Yes, I think you are right Chesnay. This would also be a good addition for >> the local result partitions. >> >> Cheers, >> Till >> >>> On 02/10/2019 16:21, Till Rohrmann wrote: >>> >>> Thanks for addressing our comments Chesnay. See some comments inline. >>> >>> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <[hidden email]> <[hidden email]> wrote: >>> >>> >>> Thank you for your comments; I've aggregated them a bit and added >>> comments to each of them. >>> >>> 1) Concept name (proposal: persistent) >>> >>> I agree that "global" is rather undescriptive, particularly so since we >>> never had a notion of "local" partitions. >>> I'm not a fan of "persistent"; as to me this always implies reliable >>> long-term storage which as I understand we aren't shooting for here. >>> >>> I was thinking of "cached" partitions. >>> >>> To Zhijiangs point, we should of course make the naming consistent >>> everywhere. >>> >>> 2) Naming of last parameter of TE#releasePartitions (proposal: >>> partitionsToRetain / partitionsToPersistent) >>> >>> I can see where you're coming from ("promote" is somewhat abstract), but >>> I think both suggestions have downsides. >>> >>> "partitionsToPersistent" to me implies an additional write operation to >>> somewhere, but we aren't doing that. >>> "partitionsToRetain" kind of results in a redundancy with the other >>> argument since retaining is the opposite to releasing a partition; if I >>> want to retain a partition, why am I not just excluding it from the set >>> to release? >>> >>> I quite like "promote" personally; we fundamentally change how the >>> lifecycle for these partitions work, and introducing new keywords isn't >>> a inherently a bad choice. >>> >>> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; >>> Note: addition of "OrPromote" is dependent on 2) ) >>> >>> Good point. >>> >>> 4) /Till: I'm not sure whether partitionsToRelease should contain a// >>> //global/persistent result partition id. I always thought that the user >>> will// >>> //be responsible for managing the lifecycle of a global/persistent// >>> //result partition./ >>> >>> @Till Please elaborate; which method/argument are you referring to? >>> >>> >>> In the FLIP you wrote "The set of partitions to release may contain local >>> and/or global partitions; the promotion set must only refer to local >>> partitions." to describe the `releasePartitions`. I think the JM should >>> never be in the situation to release a global partition. Moreover, I >>> believe we should have a separate RPC to release global result partitions >>> which might come from the RM. >>> >>> >>> 4)/Dedicated PartitionTable for global partitions/ >>> >>> Since there is only one RM for each TE a PartitionTable is unnecessary; >>> a simple set will suffice. >>> Alternatively, we could introduce such a dedicated set into the >>> PartitionTable to keep these data-structures close. >>> >>> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs >>> retain global partitions for successful jobs"/ >>> >>> Will fix it. >>> >>> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and >>> expected to interactive with JM before. Now the RM also needs to >>> interactive with ShuffleMaster to release global partitions. Then it >>> might be better to move ShuffleMaster outside of JM, and the lifecycle >>> of ShuffleMaster should be consistent with RM./ >>> >>> Yes, I alluded to this in the FLIP but should've been more explicit; the >>> shuffle master must outlive the JM. This is somewhat tricky when >>> considering the future a bit; if we assume that different jobs or even a >>> single one can use different shuffle services, then we need a way to >>> associate the partitions with the corresponding shuffle master. This >>> will likely require the introduction of a ShuffleMasterID that is >>> included in the ShuffleDescriptor. >>> >>> 7) Handover >>> >>> /Till: The handover logic between the JM and the RM for the >>> global/persistent// >>> //result partitions seems a bit brittle to me. What will happen if the JM// >>> //cannot reach the RM? I think it would be better if the TM announces the// >>> //global/persistent result partitions to the RM via its heartbeats. That >>> way// >>> //we don't rely on an established connection between the JM and RM and we// >>> //keep the TM as the ground of truth. Moreover, the RM should simply >>> forward// >>> //the release calls to the TM without much internal logic./ >>> >>> As for your question, if the JM cannot reach the RM the handover will >>> fail, the JM will likely shutdown without promoting any partition and >>> the TE will release all partitions. >>> What is the defined behavior for the JM in case of the RM disconnect >>> after a job has finished? Does it always/sometimes/never shutdown >>> with/-out communicating the result to the client / updating HA data; >>> or simply put, does the JM behave to the user as if nothing has happened >>> in all cases? >>> >>> >>> Once the JM has obtained the required slots to run a job, it no longer >>> needs to communicate with the RM. Hence, a lost RM connection won't >>> interfere with the job. I would like to keep it like this by letting the TE >>> announce global result partitions to the RM and not to introduce another >>> communication roundtrip. >>> >>> >>> A heartbeat-based approach is useful and can alleviate some failure >>> cases (see below); but we need to make sure we don't exceed the akka >>> framesize or otherwise interfere with the heartbeat mechanism (like we >>> did with metrics in the past). Ideally we would only submit updates to >>> the partition set (added/removed partitions), but I'm not sure if the >>> heartbeats are reliable enough for this to work. >>> >>> >>> How big do you expect the payload to become? >>> >>> >>> 8. Failure cases: >>> /Becket:/ >>> /a) The TEs may remove the result partition while the RM does not// >>> //know. In this case, the client will receive a runtime error and submit >>> the// >>> //full DAG to recompute the missing result partition. In this case, RM >>> should// >>> //release the incomplete global partition. How would RM be notified to do// >>> //that?// >>> //b) Is it possible the RM looses global partition metadata while// >>> //the TE still host the data? For example, RM deletes the global >>> partition// >>> //entry while the release partition call to TE failed.// >>> //c) What would happen if the JM fails before the global partitions// >>> //are registered to RM? Are users exposed to resource leak if JM does not// >>> //have HA?// >>> //d) What would happen if the RM fails? Will TE release the// >>> //partitions by themselves?/ >>> >>> 1.a) This is a good question that I haven't considered. This will likely >>> require a heartbeat-like report of available partitions. >>> >>> >>> The hearbeat based synchronization approach seems to crystalize as the way >>> to go forward with this FLIP. >>> >>> >>> >>> 1.b) RM should only delete entries if it received an ack from the TE; >>> otherwise we could easily end up leaking partitions. I believe I forgot >>> writing this down. >>> 1.c) As described in the FLIP the handoff to the RM must occur before >>> partitions are promoted. >>> If the JM fails during the handoff then the TE will cleanup all >>> partitions since it lost the connection to the JM, and partitions >>> weren't promoted yet. >>> If the JM fails after the handoff but before the promotion, same as >>> above. The RM would contain invalid entries in this case; see 1.a) . >>> If the JM fails after the handoff and promotion partitions we don't >>> leak anything since the RM is now fully responsible. >>> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup >>> all global partitions, similar to how it cleans up all partitions >>> associated with a given job if the connection to the corresponding JM is >>> disrupted. >>> >>> 9. /Becket: It looks that TE should be the source of truth of the result >>> partition// >>> //existence. Does it have to distinguish between global and local result// >>> //partitions? If TE does not need to distinguish them, it seems the the// >>> //releasePartition() method in TE could just provide the list of >>> partitions// >>> //to release, without the partitions to promote./ >>> >>> The promotion is a hard requirement, as this is the signal to the TE >>> that this partition is no longer bound to the life-cycle of a job. >>> Without the promotion the TE would delete the partition once the JM has >>> shutdown; this is a safety net to ensure cleanup of partitions in case >>> of a disconnect. >>> >>> 10. /In the current design, RM should be able to release result// >>> //partitions using ShuffleService. Will RM do this by sending RPC to the >>> TEs?// >>> //Or will the RM do it by itself?/ >>> >>> The RM will send a release call to each TM and issue a release call to >>> the ShuffleMaster, just like the JobMaster handles partition releases. >>> >>> 11. /Becket: How do we plan to handle the case when there are different >>> shuffle// >>> //services in the same Flink cluster? For example, a shared standalone// >>> //cluster./ >>> >>> This case is not considered; there are so many changes necessary in >>> other parts of the runtime that we would jump the gun in addressing it >>> here. >>> Ultimately though, I would think that that the addition of a shuffle >>> master instance ID and shuffle service identifier should suffice. >>> >>> The identifier is used in subsequent jobs to load the appropriate >>> shuffle service for a given partition (think of it like a class name), >>> while the shuffle master instance ID is used to differentiate between >>> the different shuffle master instances running in the cluster (which >>> partitions have to be associated with so we can issue the correct >>> release calls). >>> >>> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is >>> there a// >>> //reason we use `:` instead?/ >>> >>> That's netty syntax for path parameters. >>> >>> On 30/09/2019 08:34, Becket Qin wrote: >>> >>> Forgot to say that I agree with Till that it seems a good idea to let TEs >>> register the global partitions to the RM instead of letting JM do it. >>> >>> This >>> >>> simplifies quite a few things. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> <[hidden email]> >>> >>> wrote: >>> >>> Hi Chesnay, >>> >>> Thanks for the proposal. My understanding of the entire workflow step by >>> step is following: >>> >>> - JM maintains the local and global partition metadata when the task >>> runs to create result partitions. The tasks themselves does not >>> >>> distinguish >>> >>> between local / global partitions. Only the JM knows that. >>> - JM releases the local partitions as the job executes. When a job >>> finishes successfully, JM registers the global partitions to the RM. The >>> global partition IDs are set on the client instead of randomly >>> >>> generated, >>> >>> so the client can release global partitions using them. (It would be >>> >>> good >>> >>> to have some human readable string associated with the global result >>> partitions). >>> - Client issues REST call to list / release global partitions. >>> >>> A few thoughts / questions below: >>> 1. Failure cases: >>> * The TEs may remove the result partition while the RM does >>> >>> not >>> >>> know. In this case, the client will receive a runtime error and submit >>> >>> the >>> >>> full DAG to recompute the missing result partition. In this case, RM >>> >>> should >>> >>> release the incomplete global partition. How would RM be notified to do >>> that? >>> * Is it possible the RM looses global partition metadata >>> >>> while >>> >>> the TE still host the data? For example, RM deletes the global partition >>> entry while the release partition call to TE failed. >>> * What would happen if the JM fails before the global >>> >>> partitions >>> >>> are registered to RM? Are users exposed to resource leak if JM does not >>> have HA? >>> * What would happen if the RM fails? Will TE release the >>> partitions by themselves? >>> >>> 2. It looks that TE should be the source of truth of the result >>> >>> partition >>> >>> existence. Does it have to distinguish between global and local result >>> partitions? If TE does not need to distinguish them, it seems the the >>> releasePartition() method in TE could just provide the list of >>> >>> partitions >>> >>> to release, without the partitions to promote. >>> >>> 3. In the current design, RM should be able to release result >>> partitions using ShuffleService. Will RM do this by sending RPC to the >>> >>> TEs? >>> >>> Or will the RM do it by itself? >>> >>> 4. How do we plan to handle the case when there are different shuffle >>> services in the same Flink cluster? For example, a shared standalone >>> cluster. >>> >>> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >>> reason we use `:` instead? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Tue, Sep 17, 2019 at 3:22 AM zhijiang<[hidden email]> <[hidden email]> wrote: >>> >>> >>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on >>> >>> my >>> >>> side. >>> >>> I also have some similar concerns which Till already proposed before. >>> >>> 1. The consistent terminology in different components. On JM side, >>> PartitionTracker#getPersistedBlockingPartitions is defined for getting >>> global partitions. And on RM side, we define the method of >>> #registerGlobalPartitions correspondingly for handover the partitions >>> >>> from >>> >>> JM. I think it is better to unify the term in different components for >>> >>> for >>> >>> better understanding the semantic. Concering whether to use global or >>> persistent, I prefer the "global" term personally. Because it >>> >>> describes the >>> >>> scope of partition clearly, and the "persistent" is more like the >>> >>> partition >>> >>> storing way or implementation detail. In other words, the global >>> >>> partition >>> >>> might also be cached in memory of TE, not must persist into files from >>> semantic requirements. Whether memory or persistent file is just the >>> implementation choice. >>> >>> 2. On TE side, we might rename the method #releasePartitions to >>> #releaseOrPromotePartitions which describes the function precisely and >>> keeps consistent with >>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >>> >>> 3. Very agree with Till's suggestions of global PartitionTable on TE >>> >>> side >>> >>> and sticking to TE's heartbeat report to RM for global partitions. >>> >>> 4. Considering ShuffleMaster, it was built inside JM and expected to >>> interactive with JM before. Now the RM also needs to interactive with >>> ShuffleMaster to release global partitions. Then it might be better to >>> >>> move >>> >>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should >>> >>> be >>> >>> consistent with RM. >>> >>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >>> partitions for successful jobs" >>> >>> Best, >>> Zhijiang >>> >>> >>> ------------------------------------------------------------------ >>> From:Till Rohrmann <[hidden email]> <[hidden email]> >>> Send Time:2019年9月10日(星期二) 10:10 >>> To:dev <[hidden email]> <[hidden email]> >>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >>> >>> Thanks Chesnay for drafting the FLIP and starting this discussion. >>> >>> I have a couple of comments: >>> >>> * I know that I've also coined the terms global/local result partition >>> >>> but >>> >>> maybe it is not the perfect name. Maybe we could rethink the >>> >>> terminology >>> >>> and call them persistent result partitions? >>> * Nit: I would call the last parameter of void releasePartitions(JobID >>> jobId, Collection<ResultPartitionID> partitionsToRelease, >>> Collection<ResultPartitionID> partitionsToPromote) either >>> partitionsToRetain or partitionsToPersistent. >>> * I'm not sure whether partitionsToRelease should contain a >>> global/persistent result partition id. I always thought that the user >>> >>> will >>> >>> be responsible for managing the lifecycle of a global/persistent >>> result partition. >>> * Instead of extending the PartitionTable to be able to store >>> global/persistent and local/transient result partitions, I would rather >>> introduce a global PartitionTable to store the global/persistent result >>> partitions explicitly. I think there is a benefit in making things as >>> explicit as possible. >>> * The handover logic between the JM and the RM for the >>> >>> global/persistent >>> >>> result partitions seems a bit brittle to me. What will happen if the JM >>> cannot reach the RM? I think it would be better if the TM announces the >>> global/persistent result partitions to the RM via its heartbeats. That >>> >>> way >>> >>> we don't rely on an established connection between the JM and RM and we >>> keep the TM as the ground of truth. Moreover, the RM should simply >>> >>> forward >>> >>> the release calls to the TM without much internal logic. >>> >>> Cheers, >>> Till >>> >>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <[hidden email]> <[hidden email]> >>> wrote: >>> >>> >>> Hello, >>> >>> FLIP-36 (interactive programming) >>> < >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>> >>> proposes a new programming paradigm where jobs are built incrementally >>> by the user. >>> >>> To support this in an efficient manner I propose to extend partition >>> life-cycle to support the notion of /global partitions/, which are >>> partitions that can exist beyond the life-time of a job. >>> >>> These partitions could then be re-used by subsequent jobs in a fairly >>> efficient manner, as they don't have to persisted to an external >>> >>> storage >>> >>> first and consuming tasks could be scheduled to exploit data-locality. >>> >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>> and ResourceManager to support this from a life-cycle perspective. >>> >>> This FLIP does /not/ concern itself with the /usage/ of global >>> partitions, including client-side APIs, job-submission, scheduling and >>> reading said partitions; these are all follow-ups that will either be >>> part of FLIP-36 or spliced out into separate FLIPs. >>> >>> >>> >>> >>> |
While we could argue that it's a new interface so we aren't /technically
/changing anything about the ShuffleMaster, I'd assume most people would just have the ShuffleMaster implement the new interface and call it a day. On 09/10/2019 09:57, Chesnay Schepler wrote: > So should we enforce having 2 instances now or defer this to a later > date? > > I'd rather do this early since it changes 2 assumptions that > ShuffleMaster can currently make: > - every partition release is preceded by a registration of said partition > - the release of partitions may rely on local data > > On 04/10/2019 17:10, Till Rohrmann wrote: >> Thanks for updating the FLIP. >> >> I think the RM does not need to have access to a full fledged >> ShuffleMaster >> implementation. Instead it should enough to give it a leaner interface >> which only supports to delete result partitions and list available >> global >> partitions. This might entail that one will have a ShuffleMaster >> implementation running on the Dispatcher and a >> GlobalResultPartitionsShuffleMaster implementation running on the RM. >> Long >> story short, if we separate the RM from the Dispatcher, then this might >> entail that we will have two ShuffleMaster incarnations running in each >> process. >> >> Cheers, >> Till >> >> On Fri, Oct 4, 2019 at 3:34 PM Chesnay Schepler <[hidden email]> >> wrote: >> >>> I have updated the FLIP. >>> >>> - consistently use "local"/"global" terminology; this incidentally >>> should >>> make it easier to update the terminology if we decide on other names >>> - inform RM via heartbeats from TE about available global partitions >>> - add dedicated method for releasing global partitions >>> - add dedicated section for required changes to the ShuffleMaster >>> (mostly >>> clarification) >>> - added some items to the "Rejected Alternatives" section >>> - updated discussion link >>> >>> >>> While writing the ShuffleMaster section I noticed the following: >>> >>> If, at any point, the JM/RM are moved into dedicated processes we >>> either >>> a) have multiple ShuffleMaster instances for the same shuffle service >>> active >>> b) require a single ShuffleMaster on the RM, to which JM calls are >>> being >>> forwarded. >>> >>> Neither of these are without pain-points; >>> a) introduces additional constraints on ShuffleMaster >>> implementations in >>> that no local state must be kept >>> b) again forces the JM to regularly be in touch with the RM, and limits >>> the ShuffleMaster interface to being RPC-friendly. >>> >>> I'm wondering whether this issue was already an anyone's radar. >>> >>> >>> On 04/10/2019 14:12, Till Rohrmann wrote: >>> >>> >>> >>> On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <[hidden email]> >>> wrote: >>> >>>> *Till: In the FLIP you wrote "The set of partitions to release may >>>> contain local >>>> and/or global partitions; the promotion set must only refer to local >>>> partitions." to describe the `releasePartitions`. I think the JM >>>> should >>>> never be in the situation to release a global partition. Moreover, I >>>> believe we should have a separate RPC to release global result >>>> partitions >>>> which might come from the RM.* >>>> >>>> We can certainly add a separate RPC method for explicitly releasing >>>> global partitions. >>>> You are correct that the JM should not be able to release those, >>>> just like the RM should not be able to release non-global partitions. >>>> *Till: Once the JM has obtained the required slots to run a job, it >>>> no longer >>>> needs to communicate with the RM. Hence, a lost RM connection won't >>>> interfere with the job. I would like to keep it like this by >>>> letting the TE >>>> announce global result partitions to the RM and not to introduce >>>> another >>>> communication roundtrip. >>>> >>>> *Agreed, this is a nice property to retain. >>>> *Till: How big do you expect the payload to become? >>>> >>>> *I don't know, which is precisely why I want to be cautious about it. >>>> The last time I made a similar assumption I didn't expect anyone to >>>> have hundreds of thousands of metrics on a single TM, which turned >>>> out to be wrong. >>>> I wouldn't exclude the possibility of a similar number of >>>> partitions being hosted on a single TE. >>>> >>>> >>>> One problem we have to solve with the heartbeat-based approach is >>>> that partitions may be lost without the TE noticing, due to >>>> disk-failures or external delete operations. >>>> Currently, for scheduling purposes we rely on information stored in >>>> the JM, and update said information if a job fails due to a missing >>>> partition. However, IIRC the JM is informed about with an exception >>>> that is thrown by the consumer of said partition, not the producer. >>>> As far as the producing TM is concerned, it is still hosting that >>>> partition. >>>> This means we have to forward errors for missing partitions from >>>> the network stack on the producers side to the TE, so that it can >>>> inform the RM about it. >>>> >>>> >>> Yes, I think you are right Chesnay. This would also be a good >>> addition for >>> the local result partitions. >>> >>> Cheers, >>> Till >>> >>>> On 02/10/2019 16:21, Till Rohrmann wrote: >>>> >>>> Thanks for addressing our comments Chesnay. See some comments inline. >>>> >>>> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler >>>> <[hidden email]> <[hidden email]> wrote: >>>> >>>> >>>> Thank you for your comments; I've aggregated them a bit and added >>>> comments to each of them. >>>> >>>> 1) Concept name (proposal: persistent) >>>> >>>> I agree that "global" is rather undescriptive, particularly so >>>> since we >>>> never had a notion of "local" partitions. >>>> I'm not a fan of "persistent"; as to me this always implies reliable >>>> long-term storage which as I understand we aren't shooting for here. >>>> >>>> I was thinking of "cached" partitions. >>>> >>>> To Zhijiangs point, we should of course make the naming consistent >>>> everywhere. >>>> >>>> 2) Naming of last parameter of TE#releasePartitions (proposal: >>>> partitionsToRetain / partitionsToPersistent) >>>> >>>> I can see where you're coming from ("promote" is somewhat >>>> abstract), but >>>> I think both suggestions have downsides. >>>> >>>> "partitionsToPersistent" to me implies an additional write >>>> operation to >>>> somewhere, but we aren't doing that. >>>> "partitionsToRetain" kind of results in a redundancy with the other >>>> argument since retaining is the opposite to releasing a partition; >>>> if I >>>> want to retain a partition, why am I not just excluding it from the >>>> set >>>> to release? >>>> >>>> I quite like "promote" personally; we fundamentally change how the >>>> lifecycle for these partitions work, and introducing new keywords >>>> isn't >>>> a inherently a bad choice. >>>> >>>> 3) Naming of TE#releasePartitions (proposal: >>>> releaseOrPromotePartitions; >>>> Note: addition of "OrPromote" is dependent on 2) ) >>>> >>>> Good point. >>>> >>>> 4) /Till: I'm not sure whether partitionsToRelease should contain a// >>>> //global/persistent result partition id. I always thought that the >>>> user >>>> will// >>>> //be responsible for managing the lifecycle of a global/persistent// >>>> //result partition./ >>>> >>>> @Till Please elaborate; which method/argument are you referring to? >>>> >>>> >>>> In the FLIP you wrote "The set of partitions to release may contain >>>> local >>>> and/or global partitions; the promotion set must only refer to local >>>> partitions." to describe the `releasePartitions`. I think the JM >>>> should >>>> never be in the situation to release a global partition. Moreover, I >>>> believe we should have a separate RPC to release global result >>>> partitions >>>> which might come from the RM. >>>> >>>> >>>> 4)/Dedicated PartitionTable for global partitions/ >>>> >>>> Since there is only one RM for each TE a PartitionTable is >>>> unnecessary; >>>> a simple set will suffice. >>>> Alternatively, we could introduce such a dedicated set into the >>>> PartitionTable to keep these data-structures close. >>>> >>>> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs >>>> retain global partitions for successful jobs"/ >>>> >>>> Will fix it. >>>> >>>> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and >>>> expected to interactive with JM before. Now the RM also needs to >>>> interactive with ShuffleMaster to release global partitions. Then it >>>> might be better to move ShuffleMaster outside of JM, and the lifecycle >>>> of ShuffleMaster should be consistent with RM./ >>>> >>>> Yes, I alluded to this in the FLIP but should've been more >>>> explicit; the >>>> shuffle master must outlive the JM. This is somewhat tricky when >>>> considering the future a bit; if we assume that different jobs or >>>> even a >>>> single one can use different shuffle services, then we need a way to >>>> associate the partitions with the corresponding shuffle master. This >>>> will likely require the introduction of a ShuffleMasterID that is >>>> included in the ShuffleDescriptor. >>>> >>>> 7) Handover >>>> >>>> /Till: The handover logic between the JM and the RM for the >>>> global/persistent// >>>> //result partitions seems a bit brittle to me. What will happen if >>>> the JM// >>>> //cannot reach the RM? I think it would be better if the TM >>>> announces the// >>>> //global/persistent result partitions to the RM via its heartbeats. >>>> That >>>> way// >>>> //we don't rely on an established connection between the JM and RM >>>> and we// >>>> //keep the TM as the ground of truth. Moreover, the RM should simply >>>> forward// >>>> //the release calls to the TM without much internal logic./ >>>> >>>> As for your question, if the JM cannot reach the RM the handover will >>>> fail, the JM will likely shutdown without promoting any partition and >>>> the TE will release all partitions. >>>> What is the defined behavior for the JM in case of the RM disconnect >>>> after a job has finished? Does it always/sometimes/never shutdown >>>> with/-out communicating the result to the client / updating HA data; >>>> or simply put, does the JM behave to the user as if nothing has >>>> happened >>>> in all cases? >>>> >>>> >>>> Once the JM has obtained the required slots to run a job, it no longer >>>> needs to communicate with the RM. Hence, a lost RM connection won't >>>> interfere with the job. I would like to keep it like this by >>>> letting the TE >>>> announce global result partitions to the RM and not to introduce >>>> another >>>> communication roundtrip. >>>> >>>> >>>> A heartbeat-based approach is useful and can alleviate some failure >>>> cases (see below); but we need to make sure we don't exceed the akka >>>> framesize or otherwise interfere with the heartbeat mechanism (like we >>>> did with metrics in the past). Ideally we would only submit updates to >>>> the partition set (added/removed partitions), but I'm not sure if the >>>> heartbeats are reliable enough for this to work. >>>> >>>> >>>> How big do you expect the payload to become? >>>> >>>> >>>> 8. Failure cases: >>>> /Becket:/ >>>> /a) The TEs may remove the result partition while the RM does not// >>>> //know. In this case, the client will receive a runtime error and >>>> submit >>>> the// >>>> //full DAG to recompute the missing result partition. In this case, RM >>>> should// >>>> //release the incomplete global partition. How would RM be notified >>>> to do// >>>> //that?// >>>> //b) Is it possible the RM looses global partition metadata while// >>>> //the TE still host the data? For example, RM deletes the global >>>> partition// >>>> //entry while the release partition call to TE failed.// >>>> //c) What would happen if the JM fails before the global partitions// >>>> //are registered to RM? Are users exposed to resource leak if JM >>>> does not// >>>> //have HA?// >>>> //d) What would happen if the RM fails? Will TE release the// >>>> //partitions by themselves?/ >>>> >>>> 1.a) This is a good question that I haven't considered. This will >>>> likely >>>> require a heartbeat-like report of available partitions. >>>> >>>> >>>> The hearbeat based synchronization approach seems to crystalize as >>>> the way >>>> to go forward with this FLIP. >>>> >>>> >>>> >>>> 1.b) RM should only delete entries if it received an ack from the TE; >>>> otherwise we could easily end up leaking partitions. I believe I >>>> forgot >>>> writing this down. >>>> 1.c) As described in the FLIP the handoff to the RM must occur before >>>> partitions are promoted. >>>> If the JM fails during the handoff then the TE will cleanup all >>>> partitions since it lost the connection to the JM, and partitions >>>> weren't promoted yet. >>>> If the JM fails after the handoff but before the promotion, >>>> same as >>>> above. The RM would contain invalid entries in this case; see 1.a) . >>>> If the JM fails after the handoff and promotion partitions we >>>> don't >>>> leak anything since the RM is now fully responsible. >>>> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup >>>> all global partitions, similar to how it cleans up all partitions >>>> associated with a given job if the connection to the corresponding >>>> JM is >>>> disrupted. >>>> >>>> 9. /Becket: It looks that TE should be the source of truth of the >>>> result >>>> partition// >>>> //existence. Does it have to distinguish between global and local >>>> result// >>>> //partitions? If TE does not need to distinguish them, it seems the >>>> the// >>>> //releasePartition() method in TE could just provide the list of >>>> partitions// >>>> //to release, without the partitions to promote./ >>>> >>>> The promotion is a hard requirement, as this is the signal to the TE >>>> that this partition is no longer bound to the life-cycle of a job. >>>> Without the promotion the TE would delete the partition once the JM >>>> has >>>> shutdown; this is a safety net to ensure cleanup of partitions in case >>>> of a disconnect. >>>> >>>> 10. /In the current design, RM should be able to release result// >>>> //partitions using ShuffleService. Will RM do this by sending RPC >>>> to the >>>> TEs?// >>>> //Or will the RM do it by itself?/ >>>> >>>> The RM will send a release call to each TM and issue a release call to >>>> the ShuffleMaster, just like the JobMaster handles partition releases. >>>> >>>> 11. /Becket: How do we plan to handle the case when there are >>>> different >>>> shuffle// >>>> //services in the same Flink cluster? For example, a shared >>>> standalone// >>>> //cluster./ >>>> >>>> This case is not considered; there are so many changes necessary in >>>> other parts of the runtime that we would jump the gun in addressing it >>>> here. >>>> Ultimately though, I would think that that the addition of a shuffle >>>> master instance ID and shuffle service identifier should suffice. >>>> >>>> The identifier is used in subsequent jobs to load the appropriate >>>> shuffle service for a given partition (think of it like a class name), >>>> while the shuffle master instance ID is used to differentiate between >>>> the different shuffle master instances running in the cluster (which >>>> partitions have to be associated with so we can issue the correct >>>> release calls). >>>> >>>> 12. /Becket: Minor: usually REST API uses `?` to pass the >>>> parameters. Is >>>> there a// >>>> //reason we use `:` instead?/ >>>> >>>> That's netty syntax for path parameters. >>>> >>>> On 30/09/2019 08:34, Becket Qin wrote: >>>> >>>> Forgot to say that I agree with Till that it seems a good idea to >>>> let TEs >>>> register the global partitions to the RM instead of letting JM do it. >>>> >>>> This >>>> >>>> simplifies quite a few things. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <[hidden email]> >>>> <[hidden email]> >>>> >>>> wrote: >>>> >>>> Hi Chesnay, >>>> >>>> Thanks for the proposal. My understanding of the entire workflow >>>> step by >>>> step is following: >>>> >>>> - JM maintains the local and global partition metadata when >>>> the task >>>> runs to create result partitions. The tasks themselves does not >>>> >>>> distinguish >>>> >>>> between local / global partitions. Only the JM knows that. >>>> - JM releases the local partitions as the job executes. When a >>>> job >>>> finishes successfully, JM registers the global partitions to the >>>> RM. The >>>> global partition IDs are set on the client instead of randomly >>>> >>>> generated, >>>> >>>> so the client can release global partitions using them. (It would be >>>> >>>> good >>>> >>>> to have some human readable string associated with the global result >>>> partitions). >>>> - Client issues REST call to list / release global partitions. >>>> >>>> A few thoughts / questions below: >>>> 1. Failure cases: >>>> * The TEs may remove the result partition while the RM >>>> does >>>> >>>> not >>>> >>>> know. In this case, the client will receive a runtime error and submit >>>> >>>> the >>>> >>>> full DAG to recompute the missing result partition. In this case, RM >>>> >>>> should >>>> >>>> release the incomplete global partition. How would RM be notified >>>> to do >>>> that? >>>> * Is it possible the RM looses global partition metadata >>>> >>>> while >>>> >>>> the TE still host the data? For example, RM deletes the global >>>> partition >>>> entry while the release partition call to TE failed. >>>> * What would happen if the JM fails before the global >>>> >>>> partitions >>>> >>>> are registered to RM? Are users exposed to resource leak if JM does >>>> not >>>> have HA? >>>> * What would happen if the RM fails? Will TE release the >>>> partitions by themselves? >>>> >>>> 2. It looks that TE should be the source of truth of the result >>>> >>>> partition >>>> >>>> existence. Does it have to distinguish between global and local result >>>> partitions? If TE does not need to distinguish them, it seems the the >>>> releasePartition() method in TE could just provide the list of >>>> >>>> partitions >>>> >>>> to release, without the partitions to promote. >>>> >>>> 3. In the current design, RM should be able to release result >>>> partitions using ShuffleService. Will RM do this by sending RPC to the >>>> >>>> TEs? >>>> >>>> Or will the RM do it by itself? >>>> >>>> 4. How do we plan to handle the case when there are different shuffle >>>> services in the same Flink cluster? For example, a shared standalone >>>> cluster. >>>> >>>> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a >>>> reason we use `:` instead? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Tue, Sep 17, 2019 at 3:22 AM >>>> zhijiang<[hidden email]> >>>> <[hidden email]> wrote: >>>> >>>> >>>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on >>>> >>>> my >>>> >>>> side. >>>> >>>> I also have some similar concerns which Till already proposed before. >>>> >>>> 1. The consistent terminology in different components. On JM side, >>>> PartitionTracker#getPersistedBlockingPartitions is defined for getting >>>> global partitions. And on RM side, we define the method of >>>> #registerGlobalPartitions correspondingly for handover the partitions >>>> >>>> from >>>> >>>> JM. I think it is better to unify the term in different components for >>>> >>>> for >>>> >>>> better understanding the semantic. Concering whether to use global or >>>> persistent, I prefer the "global" term personally. Because it >>>> >>>> describes the >>>> >>>> scope of partition clearly, and the "persistent" is more like the >>>> >>>> partition >>>> >>>> storing way or implementation detail. In other words, the global >>>> >>>> partition >>>> >>>> might also be cached in memory of TE, not must persist into files from >>>> semantic requirements. Whether memory or persistent file is just the >>>> implementation choice. >>>> >>>> 2. On TE side, we might rename the method #releasePartitions to >>>> #releaseOrPromotePartitions which describes the function precisely and >>>> keeps consistent with >>>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >>>> >>>> 3. Very agree with Till's suggestions of global PartitionTable on TE >>>> >>>> side >>>> >>>> and sticking to TE's heartbeat report to RM for global partitions. >>>> >>>> 4. Considering ShuffleMaster, it was built inside JM and expected to >>>> interactive with JM before. Now the RM also needs to interactive with >>>> ShuffleMaster to release global partitions. Then it might be better to >>>> >>>> move >>>> >>>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should >>>> >>>> be >>>> >>>> consistent with RM. >>>> >>>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >>>> partitions for successful jobs" >>>> >>>> Best, >>>> Zhijiang >>>> >>>> >>>> ------------------------------------------------------------------ >>>> From:Till Rohrmann <[hidden email]> <[hidden email]> >>>> Send Time:2019年9月10日(星期二) 10:10 >>>> To:dev <[hidden email]> <[hidden email]> >>>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >>>> >>>> Thanks Chesnay for drafting the FLIP and starting this discussion. >>>> >>>> I have a couple of comments: >>>> >>>> * I know that I've also coined the terms global/local result partition >>>> >>>> but >>>> >>>> maybe it is not the perfect name. Maybe we could rethink the >>>> >>>> terminology >>>> >>>> and call them persistent result partitions? >>>> * Nit: I would call the last parameter of void releasePartitions(JobID >>>> jobId, Collection<ResultPartitionID> partitionsToRelease, >>>> Collection<ResultPartitionID> partitionsToPromote) either >>>> partitionsToRetain or partitionsToPersistent. >>>> * I'm not sure whether partitionsToRelease should contain a >>>> global/persistent result partition id. I always thought that the user >>>> >>>> will >>>> >>>> be responsible for managing the lifecycle of a global/persistent >>>> result partition. >>>> * Instead of extending the PartitionTable to be able to store >>>> global/persistent and local/transient result partitions, I would >>>> rather >>>> introduce a global PartitionTable to store the global/persistent >>>> result >>>> partitions explicitly. I think there is a benefit in making things as >>>> explicit as possible. >>>> * The handover logic between the JM and the RM for the >>>> >>>> global/persistent >>>> >>>> result partitions seems a bit brittle to me. What will happen if >>>> the JM >>>> cannot reach the RM? I think it would be better if the TM announces >>>> the >>>> global/persistent result partitions to the RM via its heartbeats. That >>>> >>>> way >>>> >>>> we don't rely on an established connection between the JM and RM >>>> and we >>>> keep the TM as the ground of truth. Moreover, the RM should simply >>>> >>>> forward >>>> >>>> the release calls to the TM without much internal logic. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler >>>> <[hidden email]> <[hidden email]> >>>> wrote: >>>> >>>> >>>> Hello, >>>> >>>> FLIP-36 (interactive programming) >>>> < >>>> >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>>> >>>> >>>> proposes a new programming paradigm where jobs are built incrementally >>>> by the user. >>>> >>>> To support this in an efficient manner I propose to extend partition >>>> life-cycle to support the notion of /global partitions/, which are >>>> partitions that can exist beyond the life-time of a job. >>>> >>>> These partitions could then be re-used by subsequent jobs in a fairly >>>> efficient manner, as they don't have to persisted to an external >>>> >>>> storage >>>> >>>> first and consuming tasks could be scheduled to exploit data-locality. >>>> >>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>>> and ResourceManager to support this from a life-cycle perspective. >>>> >>>> This FLIP does /not/ concern itself with the /usage/ of global >>>> partitions, including client-side APIs, job-submission, scheduling and >>>> reading said partitions; these are all follow-ups that will either be >>>> part of FLIP-36 or spliced out into separate FLIPs. >>>> >>>> >>>> >>>> >>>> > > |
In reply to this post by Chesnay Schepler-3
Are there any other opinions in regards to the naming scheme?
(local/global, promote) On 06/09/2019 15:16, Chesnay Schepler wrote: > Hello, > > FLIP-36 (interactive programming) > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> > proposes a new programming paradigm where jobs are built incrementally > by the user. > > To support this in an efficient manner I propose to extend partition > life-cycle to support the notion of /global partitions/, which are > partitions that can exist beyond the life-time of a job. > > These partitions could then be re-used by subsequent jobs in a fairly > efficient manner, as they don't have to persisted to an external > storage first and consuming tasks could be scheduled to exploit > data-locality. > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > and ResourceManager to support this from a life-cycle perspective. > > This FLIP does /not/ concern itself with the /usage/ of global > partitions, including client-side APIs, job-submission, scheduling and > reading said partitions; these are all follow-ups that will either be > part of FLIP-36 or spliced out into separate FLIPs. > > |
I think we should introduce a separate interface for the ResourceManager so
that it can list and delete global result partitions from the shuffle service implementation. As long as the JM and RM run in the same process, this interface could be implemented by the ShuffleMaster implementations. However, we should make sure that we don't introduce unnecessary concurrency. If that should be the case, then it might be simpler to have two separate components. Some ideas for the naming problem: local/global: job/cluster, intra/inter Cheers, Till On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> wrote: > Are there any other opinions in regards to the naming scheme? > (local/global, promote) > > On 06/09/2019 15:16, Chesnay Schepler wrote: > > Hello, > > > > FLIP-36 (interactive programming) > > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> > > > proposes a new programming paradigm where jobs are built incrementally > > by the user. > > > > To support this in an efficient manner I propose to extend partition > > life-cycle to support the notion of /global partitions/, which are > > partitions that can exist beyond the life-time of a job. > > > > These partitions could then be re-used by subsequent jobs in a fairly > > efficient manner, as they don't have to persisted to an external > > storage first and consuming tasks could be scheduled to exploit > > data-locality. > > > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > > and ResourceManager to support this from a life-cycle perspective. > > > > This FLIP does /not/ concern itself with the /usage/ of global > > partitions, including client-side APIs, job-submission, scheduling and > > reading said partitions; these are all follow-ups that will either be > > part of FLIP-36 or spliced out into separate FLIPs. > > > > > > |
ooooh I like job-/cluster partitions.
On 10/10/2019 16:27, Till Rohrmann wrote: > I think we should introduce a separate interface for the ResourceManager so > that it can list and delete global result partitions from the shuffle > service implementation. As long as the JM and RM run in the same process, > this interface could be implemented by the ShuffleMaster implementations. > However, we should make sure that we don't introduce unnecessary > concurrency. If that should be the case, then it might be simpler to have > two separate components. > > Some ideas for the naming problem: > > local/global: job/cluster, intra/inter > > Cheers, > Till > > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> wrote: > >> Are there any other opinions in regards to the naming scheme? >> (local/global, promote) >> >> On 06/09/2019 15:16, Chesnay Schepler wrote: >>> Hello, >>> >>> FLIP-36 (interactive programming) >>> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> >> >>> proposes a new programming paradigm where jobs are built incrementally >>> by the user. >>> >>> To support this in an efficient manner I propose to extend partition >>> life-cycle to support the notion of /global partitions/, which are >>> partitions that can exist beyond the life-time of a job. >>> >>> These partitions could then be re-used by subsequent jobs in a fairly >>> efficient manner, as they don't have to persisted to an external >>> storage first and consuming tasks could be scheduled to exploit >>> data-locality. >>> >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>> and ResourceManager to support this from a life-cycle perspective. >>> >>> This FLIP does /not/ concern itself with the /usage/ of global >>> partitions, including client-side APIs, job-submission, scheduling and >>> reading said partitions; these are all follow-ups that will either be >>> part of FLIP-36 or spliced out into separate FLIPs. >>> >>> >> |
Sorry for delay catching up with the recent progress. Thanks for the FLIP update and valuable discussions!
I also like the term of job/cluster partitions, and agree with most of the previous comments. Only left one concern of ShuffleMaster side: >However, if the separation of JM/RM into separate processes, as outlined in FLIP-6, is ever fully realized it necessarily implies that multiple shuffle master instances may exist for a given shuffle service. My previous thought was that one ShuffleService factory is for creating one shuffleMaster instance. If we have multiple ShuffleMaster instances, we might also need differentt ShuffleService factories. And it seems that different ShuffleMaster instances could run in different components based on demands, e.g. dispatcher, JM, RM. Is it also feasible to not touch the ShuffleMaster concept in this FLIP to make things a bit easy? I mean the ShuffleMaster is still running in JM component and is responsbile for job partitions. For the case of cluster partitions, the RM could interact with TE directly. TE would report global partitions as payloads via heartbeat with RM. And the RM could call TE#releaseGlobalPartitions directly not via ShuffleMaster. Even the RM could also pass the global released partitions via payloads in heartbeat with TE to reduce additional explict RPC call, but this would bring some delays for releasing partition based on heartbeat interval. Best, Zhijiang ------------------------------------------------------------------ From:Chesnay Schepler <[hidden email]> Send Time:2019年10月11日(星期五) 10:21 To:dev <[hidden email]>; Till Rohrmann <[hidden email]> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle ooooh I like job-/cluster partitions. On 10/10/2019 16:27, Till Rohrmann wrote: > I think we should introduce a separate interface for the ResourceManager so > that it can list and delete global result partitions from the shuffle > service implementation. As long as the JM and RM run in the same process, > this interface could be implemented by the ShuffleMaster implementations. > However, we should make sure that we don't introduce unnecessary > concurrency. If that should be the case, then it might be simpler to have > two separate components. > > Some ideas for the naming problem: > > local/global: job/cluster, intra/inter > > Cheers, > Till > > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> wrote: > >> Are there any other opinions in regards to the naming scheme? >> (local/global, promote) >> >> On 06/09/2019 15:16, Chesnay Schepler wrote: >>> Hello, >>> >>> FLIP-36 (interactive programming) >>> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink> >> >>> proposes a new programming paradigm where jobs are built incrementally >>> by the user. >>> >>> To support this in an efficient manner I propose to extend partition >>> life-cycle to support the notion of /global partitions/, which are >>> partitions that can exist beyond the life-time of a job. >>> >>> These partitions could then be re-used by subsequent jobs in a fairly >>> efficient manner, as they don't have to persisted to an external >>> storage first and consuming tasks could be scheduled to exploit >>> data-locality. >>> >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>> and ResourceManager to support this from a life-cycle perspective. >>> >>> This FLIP does /not/ concern itself with the /usage/ of global >>> partitions, including client-side APIs, job-submission, scheduling and >>> reading said partitions; these are all follow-ups that will either be >>> part of FLIP-36 or spliced out into separate FLIPs. >>> >>> >> |
I think we won't necessarily run multiple ShuffleMasters. I think it would
be better to pass in a leaner interface into the RM to only handle the deletion of the global result partitions. Letting the TEs handle the deletion of the global result partitions might work as long as we don't have an external shuffle service implementation. Hence, it could be a first step to decrease complexity but in order to complete this feature, I think we need to do it differently. Cheers, Till On Sat, Oct 12, 2019 at 7:39 AM zhijiang <[hidden email]> wrote: > Sorry for delay catching up with the recent progress. Thanks for the FLIP > update and valuable discussions! > > I also like the term of job/cluster partitions, and agree with most of the > previous comments. > > Only left one concern of ShuffleMaster side: > >However, if the separation of JM/RM into separate processes, as outlined > in FLIP-6, is ever fully realized it necessarily implies that multiple > shuffle master instances may exist for a given shuffle service. > > My previous thought was that one ShuffleService factory is for creating > one shuffleMaster instance. If we have multiple ShuffleMaster instances, we > might also need differentt ShuffleService factories. > And it seems that different ShuffleMaster instances could run in different > components based on demands, e.g. dispatcher, JM, RM. > > Is it also feasible to not touch the ShuffleMaster concept in this FLIP to > make things a bit easy? I mean the ShuffleMaster is still running in JM > component and is responsbile for job partitions. For the case of cluster > partitions, the RM could interact with TE directly. TE would report global > partitions as payloads via heartbeat with RM. And the RM could call > TE#releaseGlobalPartitions directly not via ShuffleMaster. Even the RM > could also pass the global released partitions via payloads in heartbeat > with TE to reduce additional explict RPC call, but this would bring some > delays for releasing partition based on heartbeat interval. > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Chesnay Schepler <[hidden email]> > Send Time:2019年10月11日(星期五) 10:21 > To:dev <[hidden email]>; Till Rohrmann <[hidden email]> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > ooooh I like job-/cluster partitions. > > On 10/10/2019 16:27, Till Rohrmann wrote: > > I think we should introduce a separate interface for the ResourceManager > so > > that it can list and delete global result partitions from the shuffle > > service implementation. As long as the JM and RM run in the same process, > > this interface could be implemented by the ShuffleMaster implementations. > > However, we should make sure that we don't introduce unnecessary > > concurrency. If that should be the case, then it might be simpler to have > > two separate components. > > > > Some ideas for the naming problem: > > > > local/global: job/cluster, intra/inter > > > > Cheers, > > Till > > > > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> > wrote: > > > >> Are there any other opinions in regards to the naming scheme? > >> (local/global, promote) > >> > >> On 06/09/2019 15:16, Chesnay Schepler wrote: > >>> Hello, > >>> > >>> FLIP-36 (interactive programming) > >>> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > >> > >>> proposes a new programming paradigm where jobs are built incrementally > >>> by the user. > >>> > >>> To support this in an efficient manner I propose to extend partition > >>> life-cycle to support the notion of /global partitions/, which are > >>> partitions that can exist beyond the life-time of a job. > >>> > >>> These partitions could then be re-used by subsequent jobs in a fairly > >>> efficient manner, as they don't have to persisted to an external > >>> storage first and consuming tasks could be scheduled to exploit > >>> data-locality. > >>> > >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor > >>> and ResourceManager to support this from a life-cycle perspective. > >>> > >>> This FLIP does /not/ concern itself with the /usage/ of global > >>> partitions, including client-side APIs, job-submission, scheduling and > >>> reading said partitions; these are all follow-ups that will either be > >>> part of FLIP-36 or spliced out into separate FLIPs. > >>> > >>> > >> > > |
I'm quite torn on whether to exclude the ShuffleServices from the
proposal. I think I'm now on my third or fourth iteration for a response, so I'll just send both so I can stop thinking for a bit about whether to push for one or the other: Opinion A, aka "Nu Uh": I'm not in favor of excluding the shuffle master from this proposal; I believe it raises interesting questions that should be discussed beforehand; otherwise we may just end up developing ourselves into a corner. Unless there are good reasons for doing so I'd prefer to keep the functionality across shuffle services consistent. And man, my last sentence is giving me headaches (how can you introduce inconsistencies across shuffle services if you don't even touch them?..) Ultimately the RM only needs the ShuffleService for 2 things, which are fairly straight-forward: 1. list partitions 2. delete partitions Both of these are /exclusively /used via the REST APIs. In terms of scope I wanted this proposal to contain something that feels complete. If there is functionality to have a partition stick around, there needs to be a mechanism to delete it. Thus you also need a way to list them, simply for practical purposes. I do believe that without these this whole proposal is very much incomplete and would hate to see them excluded. It just /makes sense/ to have them. Yes, technically speak Could we exclude the external shuffle services from this logic? Sure, but I'm quite worried that we will not tackle this problem again for 1.10, and if we don't we end up with really inconsistent behavior across versions. In 1.9 you can have local state in your master implementation, and, bar extraordinary circumstances, will get a release call for partition that was registered. In 1.10 that last part that goes down the drain, and in 1.X the last part is back in play but you can't have local state anymore since another instance is running on the RM. Who is even supposed to keep up with that? It's still an interface that is exposed to every user. I don't think we should impose constraints in such a cut loose fashion. At last, the fact that we can implement this in a way where it works for some shuffle services and not others should already be quite a red flag. The RM maybe shouldn't do any tracking and just forward the heartbeat payload to the ThinShuffleMaster present on the RM. Opinion B, aka "technically it would be fine" The counterpoint to the whole REST API completeness argument is that while the /runtime //supports /having partitions stick around, there is technically no way for anyone to enable such behavior at runtime. Hence, with no user-facing APIs to enable the feature, we don't necessarily need a user-facing API for management purposes, and could defer both to a later point where this feature is exposed fully to users. But then it's hard to justify having any communication between the TE and RM at all; it literally serves no purpose. The TE could just keep cluster partitions around until the RM disconnects. Which would then also raise the question what exactly of substance is left in this proposal. @Till yes, the RM should work against a different interface; I don't think anyone has argued against that. Let's put this point to rest. :) On 13/10/2019 11:04, Till Rohrmann wrote: > I think we won't necessarily run multiple ShuffleMasters. I think it would > be better to pass in a leaner interface into the RM to only handle the > deletion of the global result partitions. > > Letting the TEs handle the deletion of the global result partitions might > work as long as we don't have an external shuffle service implementation. > Hence, it could be a first step to decrease complexity but in order to > complete this feature, I think we need to do it differently. > > Cheers, > Till > > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <[hidden email]> > wrote: > >> Sorry for delay catching up with the recent progress. Thanks for the FLIP >> update and valuable discussions! >> >> I also like the term of job/cluster partitions, and agree with most of the >> previous comments. >> >> Only left one concern of ShuffleMaster side: >>> However, if the separation of JM/RM into separate processes, as outlined >> in FLIP-6, is ever fully realized it necessarily implies that multiple >> shuffle master instances may exist for a given shuffle service. >> >> My previous thought was that one ShuffleService factory is for creating >> one shuffleMaster instance. If we have multiple ShuffleMaster instances, we >> might also need differentt ShuffleService factories. >> And it seems that different ShuffleMaster instances could run in different >> components based on demands, e.g. dispatcher, JM, RM. >> >> Is it also feasible to not touch the ShuffleMaster concept in this FLIP to >> make things a bit easy? I mean the ShuffleMaster is still running in JM >> component and is responsbile for job partitions. For the case of cluster >> partitions, the RM could interact with TE directly. TE would report global >> partitions as payloads via heartbeat with RM. And the RM could call >> TE#releaseGlobalPartitions directly not via ShuffleMaster. Even the RM >> could also pass the global released partitions via payloads in heartbeat >> with TE to reduce additional explict RPC call, but this would bring some >> delays for releasing partition based on heartbeat interval. >> >> Best, >> Zhijiang >> ------------------------------------------------------------------ >> From:Chesnay Schepler <[hidden email]> >> Send Time:2019年10月11日(星期五) 10:21 >> To:dev <[hidden email]>; Till Rohrmann <[hidden email]> >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >> >> ooooh I like job-/cluster partitions. >> >> On 10/10/2019 16:27, Till Rohrmann wrote: >>> I think we should introduce a separate interface for the ResourceManager >> so >>> that it can list and delete global result partitions from the shuffle >>> service implementation. As long as the JM and RM run in the same process, >>> this interface could be implemented by the ShuffleMaster implementations. >>> However, we should make sure that we don't introduce unnecessary >>> concurrency. If that should be the case, then it might be simpler to have >>> two separate components. >>> >>> Some ideas for the naming problem: >>> >>> local/global: job/cluster, intra/inter >>> >>> Cheers, >>> Till >>> >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> >> wrote: >>>> Are there any other opinions in regards to the naming scheme? >>>> (local/global, promote) >>>> >>>> On 06/09/2019 15:16, Chesnay Schepler wrote: >>>>> Hello, >>>>> >>>>> FLIP-36 (interactive programming) >>>>> < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >>>>> proposes a new programming paradigm where jobs are built incrementally >>>>> by the user. >>>>> >>>>> To support this in an efficient manner I propose to extend partition >>>>> life-cycle to support the notion of /global partitions/, which are >>>>> partitions that can exist beyond the life-time of a job. >>>>> >>>>> These partitions could then be re-used by subsequent jobs in a fairly >>>>> efficient manner, as they don't have to persisted to an external >>>>> storage first and consuming tasks could be scheduled to exploit >>>>> data-locality. >>>>> >>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor >>>>> and ResourceManager to support this from a life-cycle perspective. >>>>> >>>>> This FLIP does /not/ concern itself with the /usage/ of global >>>>> partitions, including client-side APIs, job-submission, scheduling and >>>>> reading said partitions; these are all follow-ups that will either be >>>>> part of FLIP-36 or spliced out into separate FLIPs. >>>>> >>>>> >> |
In reply to this post by Till Rohrmann
Thanks for the further explanation Till!
It is fine for me to run only one ShuffleMaster instance as now, and make RM handle the deletion of cluster partitions in a light-weight way. I also have no concerns of letting TE handle the deletion of cluster partititions as did for job partitions now. Best, Zhijiang ------------------------------------------------------------------ From:Till Rohrmann <[hidden email]> Send Time:2019年10月13日(星期日) 17:04 To:zhijiang <[hidden email]> Cc:dev <[hidden email]> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle I think we won't necessarily run multiple ShuffleMasters. I think it would be better to pass in a leaner interface into the RM to only handle the deletion of the global result partitions. Letting the TEs handle the deletion of the global result partitions might work as long as we don't have an external shuffle service implementation. Hence, it could be a first step to decrease complexity but in order to complete this feature, I think we need to do it differently. Cheers, Till On Sat, Oct 12, 2019 at 7:39 AM zhijiang <[hidden email]> wrote: > Sorry for delay catching up with the recent progress. Thanks for the FLIP > update and valuable discussions! > > I also like the term of job/cluster partitions, and agree with most of the > previous comments. > > Only left one concern of ShuffleMaster side: > >However, if the separation of JM/RM into separate processes, as outlined > in FLIP-6, is ever fully realized it necessarily implies that multiple > shuffle master instances may exist for a given shuffle service. > > My previous thought was that one ShuffleService factory is for creating > one shuffleMaster instance. If we have multiple ShuffleMaster instances, we > might also need differentt ShuffleService factories. > And it seems that different ShuffleMaster instances could run in different > components based on demands, e.g. dispatcher, JM, RM. > > Is it also feasible to not touch the ShuffleMaster concept in this FLIP to > make things a bit easy? I mean the ShuffleMaster is still running in JM > component and is responsbile for job partitions. For the case of cluster > partitions, the RM could interact with TE directly. TE would report global > partitions as payloads via heartbeat with RM. And the RM could call > TE#releaseGlobalPartitions directly not via ShuffleMaster. Even the RM > could also pass the global released partitions via payloads in heartbeat > with TE to reduce additional explict RPC call, but this would bring some > delays for releasing partition based on heartbeat interval. > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Chesnay Schepler <[hidden email]> > Send Time:2019年10月11日(星期五) 10:21 > To:dev <[hidden email]>; Till Rohrmann <[hidden email]> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > ooooh I like job-/cluster partitions. > > On 10/10/2019 16:27, Till Rohrmann wrote: > > I think we should introduce a separate interface for the ResourceManager > so > > that it can list and delete global result partitions from the shuffle > > service implementation. As long as the JM and RM run in the same process, > > this interface could be implemented by the ShuffleMaster implementations. > > However, we should make sure that we don't introduce unnecessary > > concurrency. If that should be the case, then it might be simpler to have > > two separate components. > > > > Some ideas for the naming problem: > > > > local/global: job/cluster, intra/inter > > > > Cheers, > > Till > > > > On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <[hidden email]> > wrote: > > > >> Are there any other opinions in regards to the naming scheme? > >> (local/global, promote) > >> > >> On 06/09/2019 15:16, Chesnay Schepler wrote: > >>> Hello, > >>> > >>> FLIP-36 (interactive programming) > >>> < > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > >> > >>> proposes a new programming paradigm where jobs are built incrementally > >>> by the user. > >>> > >>> To support this in an efficient manner I propose to extend partition > >>> life-cycle to support the notion of /global partitions/, which are > >>> partitions that can exist beyond the life-time of a job. > >>> > >>> These partitions could then be re-used by subsequent jobs in a fairly > >>> efficient manner, as they don't have to persisted to an external > >>> storage first and consuming tasks could be scheduled to exploit > >>> data-locality. > >>> > >>> The FLIP outlines the required changes on the JobMaster, TaskExecutor > >>> and ResourceManager to support this from a life-cycle perspective. > >>> > >>> This FLIP does /not/ concern itself with the /usage/ of global > >>> partitions, including client-side APIs, job-submission, scheduling and > >>> reading said partitions; these are all follow-ups that will either be > >>> part of FLIP-36 or spliced out into separate FLIPs. > >>> > >>> > >> > > |
Free forum by Nabble | Edit this page |