Hi everyone,
We would like to start a discussion thread on "FLIP-118: Improve Flink’s ID system"[1]. This FLIP mainly discusses the following issues, target to enhance the readability of IDs in log and help user to debug in case of failures: - Enhance the readability of the string literals of IDs. Most of them are hashcodes, e.g. ExecutionAttemptID, which do not provide much meaningful information and are hard to recognize and compare for users. - Log the ID’s lineage information to make debugging more convenient. Currently, the log fails to always show the lineage information between IDs. Finding out relationships between entities identified by given IDs is a common demand, e.g., slot of which AllocationID is assigned to satisfy slot request of with SlotRequestID. Absence of such lineage information, it’s impossible to track the end to end lifecycle of an Execution or a Task now, which makes debugging difficult. Key changes proposed in the FLIP are as follows: - Add location information to distributed components - Add topology information to graph components - Log the ID’s lineage information - Expose the identifier of distributing component to user Please find more details in the FLIP wiki document [1]. Looking forward to your feedbacks. [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 Best, Yangze Guo |
Hi Yangze,
thanks for creating this FLIP. I think it is a very good improvement helping our users and ourselves understanding better what's going on in Flink. Creating the ResourceIDs with host information/pod name is a good idea. Also deriving ExecutionGraph IDs from their superset ID is a good idea. The InstanceID is used for fencing purposes. I would not make it a composition of the ResourceID + a monotonically increasing number. The problem is that in case of a RM failure the InstanceIDs would start from 0 again and this could lead to collisions. Logging more information on how the different runtime IDs are correlated is also a good idea. Two other ideas for simplifying the ids are the following: * The SlotRequestID was introduced because the SlotPool was a separate RpcEndpoint a while ago. With this no longer being the case I think we could remove the SlotRequestID and replace it with the AllocationID. * Instead of creating new SlotRequestIDs for multi task slots one could derive them from the SlotRequestID used for requesting the underlying AllocatedSlot. Given that the slot sharing logic will most likely be reworked with the pipelined region scheduling, we might be able to resolve these two points as part of the pipelined region scheduling effort. Cheers, Till On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> wrote: > Hi everyone, > > We would like to start a discussion thread on "FLIP-118: Improve > Flink’s ID system"[1]. > > This FLIP mainly discusses the following issues, target to enhance the > readability of IDs in log and help user to debug in case of failures: > > - Enhance the readability of the string literals of IDs. Most of them > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > meaningful information and are hard to recognize and compare for > users. > - Log the ID’s lineage information to make debugging more convenient. > Currently, the log fails to always show the lineage information > between IDs. Finding out relationships between entities identified by > given IDs is a common demand, e.g., slot of which AllocationID is > assigned to satisfy slot request of with SlotRequestID. Absence of > such lineage information, it’s impossible to track the end to end > lifecycle of an Execution or a Task now, which makes debugging > difficult. > > Key changes proposed in the FLIP are as follows: > > - Add location information to distributed components > - Add topology information to graph components > - Log the ID’s lineage information > - Expose the identifier of distributing component to user > > Please find more details in the FLIP wiki document [1]. Looking forward to > your feedbacks. > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > Best, > Yangze Guo > |
Hi Yangze, Hi Till,
thanks you for working on this topic. I believe it will make debugging large Apache Flink deployments much more feasible. I was wondering whether it would make sense to allow the user to specify the Resource ID in standalone setups? For example, many users still implicitly use standalone clusters on Kubernetes (the native support is still experimental) and in these cases it would be interesting to also set the PodName as the ResourceID. What do you think? Cheers, Kosntantin On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> wrote: > Hi Yangze, > > thanks for creating this FLIP. I think it is a very good improvement > helping our users and ourselves understanding better what's going on in > Flink. > > Creating the ResourceIDs with host information/pod name is a good idea. > > Also deriving ExecutionGraph IDs from their superset ID is a good idea. > > The InstanceID is used for fencing purposes. I would not make it a > composition of the ResourceID + a monotonically increasing number. The > problem is that in case of a RM failure the InstanceIDs would start from 0 > again and this could lead to collisions. > > Logging more information on how the different runtime IDs are correlated is > also a good idea. > > Two other ideas for simplifying the ids are the following: > > * The SlotRequestID was introduced because the SlotPool was a separate > RpcEndpoint a while ago. With this no longer being the case I think we > could remove the SlotRequestID and replace it with the AllocationID. > * Instead of creating new SlotRequestIDs for multi task slots one could > derive them from the SlotRequestID used for requesting the underlying > AllocatedSlot. > > Given that the slot sharing logic will most likely be reworked with the > pipelined region scheduling, we might be able to resolve these two points > as part of the pipelined region scheduling effort. > > Cheers, > Till > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> wrote: > > > Hi everyone, > > > > We would like to start a discussion thread on "FLIP-118: Improve > > Flink’s ID system"[1]. > > > > This FLIP mainly discusses the following issues, target to enhance the > > readability of IDs in log and help user to debug in case of failures: > > > > - Enhance the readability of the string literals of IDs. Most of them > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > meaningful information and are hard to recognize and compare for > > users. > > - Log the ID’s lineage information to make debugging more convenient. > > Currently, the log fails to always show the lineage information > > between IDs. Finding out relationships between entities identified by > > given IDs is a common demand, e.g., slot of which AllocationID is > > assigned to satisfy slot request of with SlotRequestID. Absence of > > such lineage information, it’s impossible to track the end to end > > lifecycle of an Execution or a Task now, which makes debugging > > difficult. > > > > Key changes proposed in the FLIP are as follows: > > > > - Add location information to distributed components > > - Add topology information to graph components > > - Log the ID’s lineage information > > - Expose the identifier of distributing component to user > > > > Please find more details in the FLIP wiki document [1]. Looking forward > to > > your feedbacks. > > > > [1] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > Best, > > Yangze Guo > > > -- Konstantin Knauf | Head of Product +49 160 91394525 Follow us @VervericaData Ververica <https://www.ververica.com/> -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng |
Hi Konstantin,
I think it is a good idea. Currently, our users also report a similar issue with resourceId of standalone cluster. When we start a standalone cluster now, the `TaskManagerRunner` always generates a uuid for the resourceId. It will be used to register to the jobmanager and not convenient to match with the real taskmanager, especially in container environment. I think a probably solution is we could support the user defined resourceId. We could get it from the environment. For standalone on K8s, we could set the "RESOURCE_ID" env to the pod name so that it is easier to match the taskmanager with K8s pod. Moreover, i am afraid we could not set the pod name to the resourceId. I think you could set the "deployment.meta.name". Since the pod name is generated by K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the contrary, we will set the resourceId to the pod name. Best, Yang Konstantin Knauf <[hidden email]> 于2020年3月29日周日 下午8:06写道: > Hi Yangze, Hi Till, > > thanks you for working on this topic. I believe it will make debugging > large Apache Flink deployments much more feasible. > > I was wondering whether it would make sense to allow the user to specify > the Resource ID in standalone setups? For example, many users still > implicitly use standalone clusters on Kubernetes (the native support is > still experimental) and in these cases it would be interesting to also set > the PodName as the ResourceID. What do you think? > > Cheers, > > Kosntantin > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> > wrote: > > > Hi Yangze, > > > > thanks for creating this FLIP. I think it is a very good improvement > > helping our users and ourselves understanding better what's going on in > > Flink. > > > > Creating the ResourceIDs with host information/pod name is a good idea. > > > > Also deriving ExecutionGraph IDs from their superset ID is a good idea. > > > > The InstanceID is used for fencing purposes. I would not make it a > > composition of the ResourceID + a monotonically increasing number. The > > problem is that in case of a RM failure the InstanceIDs would start from > 0 > > again and this could lead to collisions. > > > > Logging more information on how the different runtime IDs are correlated > is > > also a good idea. > > > > Two other ideas for simplifying the ids are the following: > > > > * The SlotRequestID was introduced because the SlotPool was a separate > > RpcEndpoint a while ago. With this no longer being the case I think we > > could remove the SlotRequestID and replace it with the AllocationID. > > * Instead of creating new SlotRequestIDs for multi task slots one could > > derive them from the SlotRequestID used for requesting the underlying > > AllocatedSlot. > > > > Given that the slot sharing logic will most likely be reworked with the > > pipelined region scheduling, we might be able to resolve these two points > > as part of the pipelined region scheduling effort. > > > > Cheers, > > Till > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> wrote: > > > > > Hi everyone, > > > > > > We would like to start a discussion thread on "FLIP-118: Improve > > > Flink’s ID system"[1]. > > > > > > This FLIP mainly discusses the following issues, target to enhance the > > > readability of IDs in log and help user to debug in case of failures: > > > > > > - Enhance the readability of the string literals of IDs. Most of them > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > > meaningful information and are hard to recognize and compare for > > > users. > > > - Log the ID’s lineage information to make debugging more convenient. > > > Currently, the log fails to always show the lineage information > > > between IDs. Finding out relationships between entities identified by > > > given IDs is a common demand, e.g., slot of which AllocationID is > > > assigned to satisfy slot request of with SlotRequestID. Absence of > > > such lineage information, it’s impossible to track the end to end > > > lifecycle of an Execution or a Task now, which makes debugging > > > difficult. > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > - Add location information to distributed components > > > - Add topology information to graph components > > > - Log the ID’s lineage information > > > - Expose the identifier of distributing component to user > > > > > > Please find more details in the FLIP wiki document [1]. Looking forward > > to > > > your feedbacks. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > Best, > > > Yangze Guo > > > > > > > > -- > > Konstantin Knauf | Head of Product > > +49 160 91394525 > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Tony) Cheng > |
+1 on allowing user defined resourceId for taskmanager
On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> wrote: > Hi Konstantin, > > I think it is a good idea. Currently, our users also report a similar issue > with > resourceId of standalone cluster. When we start a standalone cluster now, > the `TaskManagerRunner` always generates a uuid for the resourceId. It will > be used to register to the jobmanager and not convenient to match with the > real > taskmanager, especially in container environment. > > I think a probably solution is we could support the user defined > resourceId. > We could get it from the environment. For standalone on K8s, we could set > the "RESOURCE_ID" env to the pod name so that it is easier to match the > taskmanager with K8s pod. > > Moreover, i am afraid we could not set the pod name to the resourceId. I > think > you could set the "deployment.meta.name". Since the pod name is generated > by > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the > contrary, we > will set the resourceId to the pod name. > > > Best, > Yang > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 下午8:06写道: > > > Hi Yangze, Hi Till, > > > > thanks you for working on this topic. I believe it will make debugging > > large Apache Flink deployments much more feasible. > > > > I was wondering whether it would make sense to allow the user to specify > > the Resource ID in standalone setups? For example, many users still > > implicitly use standalone clusters on Kubernetes (the native support is > > still experimental) and in these cases it would be interesting to also > set > > the PodName as the ResourceID. What do you think? > > > > Cheers, > > > > Kosntantin > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> > > wrote: > > > > > Hi Yangze, > > > > > > thanks for creating this FLIP. I think it is a very good improvement > > > helping our users and ourselves understanding better what's going on in > > > Flink. > > > > > > Creating the ResourceIDs with host information/pod name is a good idea. > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a good idea. > > > > > > The InstanceID is used for fencing purposes. I would not make it a > > > composition of the ResourceID + a monotonically increasing number. The > > > problem is that in case of a RM failure the InstanceIDs would start > from > > 0 > > > again and this could lead to collisions. > > > > > > Logging more information on how the different runtime IDs are > correlated > > is > > > also a good idea. > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > * The SlotRequestID was introduced because the SlotPool was a separate > > > RpcEndpoint a while ago. With this no longer being the case I think we > > > could remove the SlotRequestID and replace it with the AllocationID. > > > * Instead of creating new SlotRequestIDs for multi task slots one could > > > derive them from the SlotRequestID used for requesting the underlying > > > AllocatedSlot. > > > > > > Given that the slot sharing logic will most likely be reworked with the > > > pipelined region scheduling, we might be able to resolve these two > points > > > as part of the pipelined region scheduling effort. > > > > > > Cheers, > > > Till > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> > wrote: > > > > > > > Hi everyone, > > > > > > > > We would like to start a discussion thread on "FLIP-118: Improve > > > > Flink’s ID system"[1]. > > > > > > > > This FLIP mainly discusses the following issues, target to enhance > the > > > > readability of IDs in log and help user to debug in case of failures: > > > > > > > > - Enhance the readability of the string literals of IDs. Most of them > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > > > meaningful information and are hard to recognize and compare for > > > > users. > > > > - Log the ID’s lineage information to make debugging more convenient. > > > > Currently, the log fails to always show the lineage information > > > > between IDs. Finding out relationships between entities identified by > > > > given IDs is a common demand, e.g., slot of which AllocationID is > > > > assigned to satisfy slot request of with SlotRequestID. Absence of > > > > such lineage information, it’s impossible to track the end to end > > > > lifecycle of an Execution or a Task now, which makes debugging > > > > difficult. > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > - Add location information to distributed components > > > > - Add topology information to graph components > > > > - Log the ID’s lineage information > > > > - Expose the identifier of distributing component to user > > > > > > > > Please find more details in the FLIP wiki document [1]. Looking > forward > > > to > > > > your feedbacks. > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > > > > > > -- > > > > Konstantin Knauf | Head of Product > > > > +49 160 91394525 > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > -- > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > (Tony) Cheng > > > |
Thank you all for the feedback! Sorry for the belated reply.
@Till I'm +1 for your two ideas and I'd like to move these two out of the scope of this FLIP since the pipelined region scheduling is an ongoing work now. I also agree that we should not make the InstanceID in TaskExecutorConnection being composed of the ResourceID plus a monotonically increasing value. Thanks a lot for your explanation. @Konstantin @Yang Regarding the PodName of TaskExecutor on K8s, I second Yang's suggestion. It makes sense to me to let user export RESOURCE_ID and make TM respect it. User needs to guarantee there is no collision for different TM. Best, Yangze Guo On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> wrote: > > +1 on allowing user defined resourceId for taskmanager > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> wrote: > > > Hi Konstantin, > > > > I think it is a good idea. Currently, our users also report a similar issue > > with > > resourceId of standalone cluster. When we start a standalone cluster now, > > the `TaskManagerRunner` always generates a uuid for the resourceId. It will > > be used to register to the jobmanager and not convenient to match with the > > real > > taskmanager, especially in container environment. > > > > I think a probably solution is we could support the user defined > > resourceId. > > We could get it from the environment. For standalone on K8s, we could set > > the "RESOURCE_ID" env to the pod name so that it is easier to match the > > taskmanager with K8s pod. > > > > Moreover, i am afraid we could not set the pod name to the resourceId. I > > think > > you could set the "deployment.meta.name". Since the pod name is generated > > by > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the > > contrary, we > > will set the resourceId to the pod name. > > > > > > Best, > > Yang > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 下午8:06写道: > > > > > Hi Yangze, Hi Till, > > > > > > thanks you for working on this topic. I believe it will make debugging > > > large Apache Flink deployments much more feasible. > > > > > > I was wondering whether it would make sense to allow the user to specify > > > the Resource ID in standalone setups? For example, many users still > > > implicitly use standalone clusters on Kubernetes (the native support is > > > still experimental) and in these cases it would be interesting to also > > set > > > the PodName as the ResourceID. What do you think? > > > > > > Cheers, > > > > > > Kosntantin > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> > > > wrote: > > > > > > > Hi Yangze, > > > > > > > > thanks for creating this FLIP. I think it is a very good improvement > > > > helping our users and ourselves understanding better what's going on in > > > > Flink. > > > > > > > > Creating the ResourceIDs with host information/pod name is a good idea. > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a good idea. > > > > > > > > The InstanceID is used for fencing purposes. I would not make it a > > > > composition of the ResourceID + a monotonically increasing number. The > > > > problem is that in case of a RM failure the InstanceIDs would start > > from > > > 0 > > > > again and this could lead to collisions. > > > > > > > > Logging more information on how the different runtime IDs are > > correlated > > > is > > > > also a good idea. > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a separate > > > > RpcEndpoint a while ago. With this no longer being the case I think we > > > > could remove the SlotRequestID and replace it with the AllocationID. > > > > * Instead of creating new SlotRequestIDs for multi task slots one could > > > > derive them from the SlotRequestID used for requesting the underlying > > > > AllocatedSlot. > > > > > > > > Given that the slot sharing logic will most likely be reworked with the > > > > pipelined region scheduling, we might be able to resolve these two > > points > > > > as part of the pipelined region scheduling effort. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: Improve > > > > > Flink’s ID system"[1]. > > > > > > > > > > This FLIP mainly discusses the following issues, target to enhance > > the > > > > > readability of IDs in log and help user to debug in case of failures: > > > > > > > > > > - Enhance the readability of the string literals of IDs. Most of them > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > > > > meaningful information and are hard to recognize and compare for > > > > > users. > > > > > - Log the ID’s lineage information to make debugging more convenient. > > > > > Currently, the log fails to always show the lineage information > > > > > between IDs. Finding out relationships between entities identified by > > > > > given IDs is a common demand, e.g., slot of which AllocationID is > > > > > assigned to satisfy slot request of with SlotRequestID. Absence of > > > > > such lineage information, it’s impossible to track the end to end > > > > > lifecycle of an Execution or a Task now, which makes debugging > > > > > difficult. > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > - Add location information to distributed components > > > > > - Add topology information to graph components > > > > > - Log the ID’s lineage information > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > Please find more details in the FLIP wiki document [1]. Looking > > forward > > > > to > > > > > your feedbacks. > > > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > -- > > > > > > Konstantin Knauf | Head of Product > > > > > > +49 160 91394525 > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > > > > -- > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > > Conference > > > > > > Stream Processing | Event Driven | Real Time > > > > > > -- > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > -- > > > Ververica GmbH > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > > (Tony) Cheng > > > > > |
Thanks for proposing this improvement Yangze. Big +1 for the overall
proposal. It can help a lot in troubleshooting. Here are a few questions for it: 1. Shall we make JobVertexID a composition of JobID and a topology index? This would help in the session cluster case, so that we can identify which tasks are from which jobs along with the rework of ExecutionAttemptID. 2. You mentioned that "Add the producer info to the string literal of IntermediateDataSetID". Do you mean to make IntermediateDataSetID a composition of JobVertexID and a consumer index? How about we still keep IntermediateDataSetID independent from JobVertexID, but just print the producing relationships in logs? I think keeping IntermediateDataSetID independent may be better considering the cross job result usages in interactive query cases. 3. The new IDs will become larger with this rework. The TaskDeploymentDescriptor can become much larger since it is mainly composed of a variety DIs. I'm not sure how much it would be but there can be more memory and CPU cost for it, and results in more frequent GCs, message size exceeding akka frame limits, and a longer blocked time of main thread. This should not be a problem in most cases but might be a problem for large scale jobs. Shall we have an benchmark for it? Thanks, Zhu Zhu Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > Thank you all for the feedback! Sorry for the belated reply. > > @Till > I'm +1 for your two ideas and I'd like to move these two out of the > scope of this FLIP since the pipelined region scheduling is an ongoing > work now. > I also agree that we should not make the InstanceID in > TaskExecutorConnection being composed of the ResourceID plus a > monotonically increasing value. Thanks a lot for your explanation. > > @Konstantin @Yang > Regarding the PodName of TaskExecutor on K8s, I second Yang's > suggestion. It makes sense to me to let user export RESOURCE_ID and > make TM respect it. User needs to guarantee there is no collision for > different TM. > > Best, > Yangze Guo > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> wrote: > > > > +1 on allowing user defined resourceId for taskmanager > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> wrote: > > > > > Hi Konstantin, > > > > > > I think it is a good idea. Currently, our users also report a similar > issue > > > with > > > resourceId of standalone cluster. When we start a standalone cluster > now, > > > the `TaskManagerRunner` always generates a uuid for the resourceId. It > will > > > be used to register to the jobmanager and not convenient to match with > the > > > real > > > taskmanager, especially in container environment. > > > > > > I think a probably solution is we could support the user defined > > > resourceId. > > > We could get it from the environment. For standalone on K8s, we could > set > > > the "RESOURCE_ID" env to the pod name so that it is easier to match the > > > taskmanager with K8s pod. > > > > > > Moreover, i am afraid we could not set the pod name to the resourceId. > I > > > think > > > you could set the "deployment.meta.name". Since the pod name is > generated > > > by > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the > > > contrary, we > > > will set the resourceId to the pod name. > > > > > > > > > Best, > > > Yang > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 下午8:06写道: > > > > > > > Hi Yangze, Hi Till, > > > > > > > > thanks you for working on this topic. I believe it will make > debugging > > > > large Apache Flink deployments much more feasible. > > > > > > > > I was wondering whether it would make sense to allow the user to > specify > > > > the Resource ID in standalone setups? For example, many users still > > > > implicitly use standalone clusters on Kubernetes (the native support > is > > > > still experimental) and in these cases it would be interesting to > also > > > set > > > > the PodName as the ResourceID. What do you think? > > > > > > > > Cheers, > > > > > > > > Kosntantin > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> > > > > wrote: > > > > > > > > > Hi Yangze, > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > improvement > > > > > helping our users and ourselves understanding better what's going > on in > > > > > Flink. > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a good > idea. > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a good > idea. > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make it a > > > > > composition of the ResourceID + a monotonically increasing number. > The > > > > > problem is that in case of a RM failure the InstanceIDs would start > > > from > > > > 0 > > > > > again and this could lead to collisions. > > > > > > > > > > Logging more information on how the different runtime IDs are > > > correlated > > > > is > > > > > also a good idea. > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > separate > > > > > RpcEndpoint a while ago. With this no longer being the case I > think we > > > > > could remove the SlotRequestID and replace it with the > AllocationID. > > > > > * Instead of creating new SlotRequestIDs for multi task slots one > could > > > > > derive them from the SlotRequestID used for requesting the > underlying > > > > > AllocatedSlot. > > > > > > > > > > Given that the slot sharing logic will most likely be reworked > with the > > > > > pipelined region scheduling, we might be able to resolve these two > > > points > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: Improve > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > enhance > > > the > > > > > > readability of IDs in log and help user to debug in case of > failures: > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. Most of > them > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > > > > > meaningful information and are hard to recognize and compare for > > > > > > users. > > > > > > - Log the ID’s lineage information to make debugging more > convenient. > > > > > > Currently, the log fails to always show the lineage information > > > > > > between IDs. Finding out relationships between entities > identified by > > > > > > given IDs is a common demand, e.g., slot of which AllocationID is > > > > > > assigned to satisfy slot request of with SlotRequestID. Absence > of > > > > > > such lineage information, it’s impossible to track the end to end > > > > > > lifecycle of an Execution or a Task now, which makes debugging > > > > > > difficult. > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > - Add location information to distributed components > > > > > > - Add topology information to graph components > > > > > > - Log the ID’s lineage information > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. Looking > > > forward > > > > > to > > > > > > your feedbacks. > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > +49 160 91394525 > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > > > > > > > -- > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > > > Conference > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > -- > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > -- > > > > Ververica GmbH > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > Ji > > > > (Tony) Cheng > > > > > > > > |
Hi, Zhu,
Thanks for the feedback. > make JobVertexID a composition of JobID and a topology index I think it is a good idea. However, it seems the JobVertexID is derived from hashcode which used to identify them across submission. I'm not familiar with that component though. I prefer to keep this idea out the scope of this FLIP if no one could help us to figure it out. > How about we still keep IntermediateDataSetID independent from JobVertexID, > but just print the producing relationships in logs? I think keeping > IntermediateDataSetID independent may be better considering the cross job > result usages in interactive query cases. I think you are right. I'll keep IntermediateDataSetID independent from JobVertexID. > The new IDs will become larger with this rework. Yes, I also have the same concern. Benchmark is necessary, I'll try to provide one during the implementation phase. Best, Yangze Guo On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > Thanks for proposing this improvement Yangze. Big +1 for the overall > proposal. It can help a lot in troubleshooting. > > Here are a few questions for it: > 1. Shall we make JobVertexID a composition of JobID and a topology index? > This would help in the session cluster case, so that we can identify which > tasks are from which jobs along with the rework of ExecutionAttemptID. > > 2. You mentioned that "Add the producer info to the string literal of > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > composition of JobVertexID and a consumer index? > How about we still keep IntermediateDataSetID independent from JobVertexID, > but just print the producing relationships in logs? I think keeping > IntermediateDataSetID independent may be better considering the cross job > result usages in interactive query cases. > > 3. The new IDs will become larger with this rework. The > TaskDeploymentDescriptor can become much larger since it is mainly composed > of a variety DIs. I'm not sure how much it would be but there can be more > memory and CPU cost for it, and results in more frequent GCs, message size > exceeding akka frame limits, and a longer blocked time of main thread. > This should not be a problem in most cases but might be a problem for large > scale jobs. Shall we have an benchmark for it? > > Thanks, > Zhu Zhu > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > Thank you all for the feedback! Sorry for the belated reply. > > > > @Till > > I'm +1 for your two ideas and I'd like to move these two out of the > > scope of this FLIP since the pipelined region scheduling is an ongoing > > work now. > > I also agree that we should not make the InstanceID in > > TaskExecutorConnection being composed of the ResourceID plus a > > monotonically increasing value. Thanks a lot for your explanation. > > > > @Konstantin @Yang > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > make TM respect it. User needs to guarantee there is no collision for > > different TM. > > > > Best, > > Yangze Guo > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> wrote: > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> wrote: > > > > > > > Hi Konstantin, > > > > > > > > I think it is a good idea. Currently, our users also report a similar > > issue > > > > with > > > > resourceId of standalone cluster. When we start a standalone cluster > > now, > > > > the `TaskManagerRunner` always generates a uuid for the resourceId. It > > will > > > > be used to register to the jobmanager and not convenient to match with > > the > > > > real > > > > taskmanager, especially in container environment. > > > > > > > > I think a probably solution is we could support the user defined > > > > resourceId. > > > > We could get it from the environment. For standalone on K8s, we could > > set > > > > the "RESOURCE_ID" env to the pod name so that it is easier to match the > > > > taskmanager with K8s pod. > > > > > > > > Moreover, i am afraid we could not set the pod name to the resourceId. > > I > > > > think > > > > you could set the "deployment.meta.name". Since the pod name is > > generated > > > > by > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the > > > > contrary, we > > > > will set the resourceId to the pod name. > > > > > > > > > > > > Best, > > > > Yang > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 下午8:06写道: > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > thanks you for working on this topic. I believe it will make > > debugging > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > I was wondering whether it would make sense to allow the user to > > specify > > > > > the Resource ID in standalone setups? For example, many users still > > > > > implicitly use standalone clusters on Kubernetes (the native support > > is > > > > > still experimental) and in these cases it would be interesting to > > also > > > > set > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > Cheers, > > > > > > > > > > Kosntantin > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <[hidden email]> > > > > > wrote: > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > improvement > > > > > > helping our users and ourselves understanding better what's going > > on in > > > > > > Flink. > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a good > > idea. > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a good > > idea. > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make it a > > > > > > composition of the ResourceID + a monotonically increasing number. > > The > > > > > > problem is that in case of a RM failure the InstanceIDs would start > > > > from > > > > > 0 > > > > > > again and this could lead to collisions. > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > correlated > > > > > is > > > > > > also a good idea. > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > separate > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > think we > > > > > > could remove the SlotRequestID and replace it with the > > AllocationID. > > > > > > * Instead of creating new SlotRequestIDs for multi task slots one > > could > > > > > > derive them from the SlotRequestID used for requesting the > > underlying > > > > > > AllocatedSlot. > > > > > > > > > > > > Given that the slot sharing logic will most likely be reworked > > with the > > > > > > pipelined region scheduling, we might be able to resolve these two > > > > points > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <[hidden email]> > > > > wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: Improve > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > enhance > > > > the > > > > > > > readability of IDs in log and help user to debug in case of > > failures: > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. Most of > > them > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much > > > > > > > meaningful information and are hard to recognize and compare for > > > > > > > users. > > > > > > > - Log the ID’s lineage information to make debugging more > > convenient. > > > > > > > Currently, the log fails to always show the lineage information > > > > > > > between IDs. Finding out relationships between entities > > identified by > > > > > > > given IDs is a common demand, e.g., slot of which AllocationID is > > > > > > > assigned to satisfy slot request of with SlotRequestID. Absence > > of > > > > > > > such lineage information, it’s impossible to track the end to end > > > > > > > lifecycle of an Execution or a Task now, which makes debugging > > > > > > > difficult. > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > - Add topology information to graph components > > > > > > > - Log the ID’s lineage information > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. Looking > > > > forward > > > > > > to > > > > > > > your feedbacks. > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > > > > > > > > > > -- > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > > > > Conference > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > -- > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > -- > > > > > Ververica GmbH > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > > Ji > > > > > (Tony) Cheng > > > > > > > > > > > |
For the IntermediateDataSetID I was just thinking that it might actually be
interesting to know which job produced the result which, by using cluster partitions, could be used across different jobs. Not saying that we have to do it, though. A small addition to Zhu Zhu's comment about TDD sizes: For the problem with too large TDDs there is already an issue [1]. The current suspicion is that the size of TDDs for jobs with a large parallelism can indeed become problematic for Flink. Hence, it would be great to investigate the impacts of the proposed changes. [1] https://issues.apache.org/jira/browse/FLINK-16069 Cheers, Till On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> wrote: > Hi, Zhu, > > Thanks for the feedback. > > > make JobVertexID a composition of JobID and a topology index > I think it is a good idea. However, it seems the JobVertexID is > derived from hashcode which used to identify them across submission. > I'm not familiar with that component though. I prefer to keep this > idea out the scope of this FLIP if no one could help us to figure it > out. > > > How about we still keep IntermediateDataSetID independent from > JobVertexID, > > but just print the producing relationships in logs? I think keeping > > IntermediateDataSetID independent may be better considering the cross job > > result usages in interactive query cases. > I think you are right. I'll keep IntermediateDataSetID independent > from JobVertexID. > > > The new IDs will become larger with this rework. > Yes, I also have the same concern. Benchmark is necessary, I'll try to > provide one during the implementation phase. > > > Best, > Yangze Guo > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall > > proposal. It can help a lot in troubleshooting. > > > > Here are a few questions for it: > > 1. Shall we make JobVertexID a composition of JobID and a topology index? > > This would help in the session cluster case, so that we can identify > which > > tasks are from which jobs along with the rework of ExecutionAttemptID. > > > > 2. You mentioned that "Add the producer info to the string literal of > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > > composition of JobVertexID and a consumer index? > > How about we still keep IntermediateDataSetID independent from > JobVertexID, > > but just print the producing relationships in logs? I think keeping > > IntermediateDataSetID independent may be better considering the cross job > > result usages in interactive query cases. > > > > 3. The new IDs will become larger with this rework. The > > TaskDeploymentDescriptor can become much larger since it is mainly > composed > > of a variety DIs. I'm not sure how much it would be but there can be more > > memory and CPU cost for it, and results in more frequent GCs, message > size > > exceeding akka frame limits, and a longer blocked time of main thread. > > This should not be a problem in most cases but might be a problem for > large > > scale jobs. Shall we have an benchmark for it? > > > > Thanks, > > Zhu Zhu > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > @Till > > > I'm +1 for your two ideas and I'd like to move these two out of the > > > scope of this FLIP since the pipelined region scheduling is an ongoing > > > work now. > > > I also agree that we should not make the InstanceID in > > > TaskExecutorConnection being composed of the ResourceID plus a > > > monotonically increasing value. Thanks a lot for your explanation. > > > > > > @Konstantin @Yang > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > > make TM respect it. User needs to guarantee there is no collision for > > > different TM. > > > > > > Best, > > > Yangze Guo > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> > wrote: > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> > wrote: > > > > > > > > > Hi Konstantin, > > > > > > > > > > I think it is a good idea. Currently, our users also report a > similar > > > issue > > > > > with > > > > > resourceId of standalone cluster. When we start a standalone > cluster > > > now, > > > > > the `TaskManagerRunner` always generates a uuid for the > resourceId. It > > > will > > > > > be used to register to the jobmanager and not convenient to match > with > > > the > > > > > real > > > > > taskmanager, especially in container environment. > > > > > > > > > > I think a probably solution is we could support the user defined > > > > > resourceId. > > > > > We could get it from the environment. For standalone on K8s, we > could > > > set > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to > match the > > > > > taskmanager with K8s pod. > > > > > > > > > > Moreover, i am afraid we could not set the pod name to the > resourceId. > > > I > > > > > think > > > > > you could set the "deployment.meta.name". Since the pod name is > > > generated > > > > > by > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the > > > > > contrary, we > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > Best, > > > > > Yang > > > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 > 下午8:06写道: > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > thanks you for working on this topic. I believe it will make > > > debugging > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > I was wondering whether it would make sense to allow the user to > > > specify > > > > > > the Resource ID in standalone setups? For example, many users > still > > > > > > implicitly use standalone clusters on Kubernetes (the native > support > > > is > > > > > > still experimental) and in these cases it would be interesting to > > > also > > > > > set > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > > improvement > > > > > > > helping our users and ourselves understanding better what's > going > > > on in > > > > > > > Flink. > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a > good > > > idea. > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a > good > > > idea. > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make > it a > > > > > > > composition of the ResourceID + a monotonically increasing > number. > > > The > > > > > > > problem is that in case of a RM failure the InstanceIDs would > start > > > > > from > > > > > > 0 > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > > correlated > > > > > > is > > > > > > > also a good idea. > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > > separate > > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > > think we > > > > > > > could remove the SlotRequestID and replace it with the > > > AllocationID. > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots > one > > > could > > > > > > > derive them from the SlotRequestID used for requesting the > > > underlying > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be reworked > > > with the > > > > > > > pipelined region scheduling, we might be able to resolve these > two > > > > > points > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > Cheers, > > > > > > > Till > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: > Improve > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > > enhance > > > > > the > > > > > > > > readability of IDs in log and help user to debug in case of > > > failures: > > > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. > Most of > > > them > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide > much > > > > > > > > meaningful information and are hard to recognize and compare > for > > > > > > > > users. > > > > > > > > - Log the ID’s lineage information to make debugging more > > > convenient. > > > > > > > > Currently, the log fails to always show the lineage > information > > > > > > > > between IDs. Finding out relationships between entities > > > identified by > > > > > > > > given IDs is a common demand, e.g., slot of which > AllocationID is > > > > > > > > assigned to satisfy slot request of with SlotRequestID. > Absence > > > of > > > > > > > > such lineage information, it’s impossible to track the end > to end > > > > > > > > lifecycle of an Execution or a Task now, which makes > debugging > > > > > > > > difficult. > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > > - Add topology information to graph components > > > > > > > > - Log the ID’s lineage information > > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > Looking > > > > > forward > > > > > > > to > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > Best, > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache > Flink > > > > > > Conference > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > -- > > > > > > Ververica GmbH > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > Jason, > > > Ji > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > |
>> However, it seems the JobVertexID is derived from hashcode ...
You are right. JobVertexID is widely used and reworking it may affect the public interfaces, e.g. REST api. We can take it as a long term goal and exclude it from this FLIP. This same applies to IntermediateDataSetID, which can be also composed of a JobID and an index as Till proposed. Thanks, Zhu Zhu Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > For the IntermediateDataSetID I was just thinking that it might actually be > interesting to know which job produced the result which, by using cluster > partitions, could be used across different jobs. Not saying that we have to > do it, though. > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with > too large TDDs there is already an issue [1]. The current suspicion is that > the size of TDDs for jobs with a large parallelism can indeed become > problematic for Flink. Hence, it would be great to investigate the impacts > of the proposed changes. > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > Cheers, > Till > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> wrote: > > > Hi, Zhu, > > > > Thanks for the feedback. > > > > > make JobVertexID a composition of JobID and a topology index > > I think it is a good idea. However, it seems the JobVertexID is > > derived from hashcode which used to identify them across submission. > > I'm not familiar with that component though. I prefer to keep this > > idea out the scope of this FLIP if no one could help us to figure it > > out. > > > > > How about we still keep IntermediateDataSetID independent from > > JobVertexID, > > > but just print the producing relationships in logs? I think keeping > > > IntermediateDataSetID independent may be better considering the cross > job > > > result usages in interactive query cases. > > I think you are right. I'll keep IntermediateDataSetID independent > > from JobVertexID. > > > > > The new IDs will become larger with this rework. > > Yes, I also have the same concern. Benchmark is necessary, I'll try to > > provide one during the implementation phase. > > > > > > Best, > > Yangze Guo > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall > > > proposal. It can help a lot in troubleshooting. > > > > > > Here are a few questions for it: > > > 1. Shall we make JobVertexID a composition of JobID and a topology > index? > > > This would help in the session cluster case, so that we can identify > > which > > > tasks are from which jobs along with the rework of ExecutionAttemptID. > > > > > > 2. You mentioned that "Add the producer info to the string literal of > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > > > composition of JobVertexID and a consumer index? > > > How about we still keep IntermediateDataSetID independent from > > JobVertexID, > > > but just print the producing relationships in logs? I think keeping > > > IntermediateDataSetID independent may be better considering the cross > job > > > result usages in interactive query cases. > > > > > > 3. The new IDs will become larger with this rework. The > > > TaskDeploymentDescriptor can become much larger since it is mainly > > composed > > > of a variety DIs. I'm not sure how much it would be but there can be > more > > > memory and CPU cost for it, and results in more frequent GCs, message > > size > > > exceeding akka frame limits, and a longer blocked time of main thread. > > > This should not be a problem in most cases but might be a problem for > > large > > > scale jobs. Shall we have an benchmark for it? > > > > > > Thanks, > > > Zhu Zhu > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > @Till > > > > I'm +1 for your two ideas and I'd like to move these two out of the > > > > scope of this FLIP since the pipelined region scheduling is an > ongoing > > > > work now. > > > > I also agree that we should not make the InstanceID in > > > > TaskExecutorConnection being composed of the ResourceID plus a > > > > monotonically increasing value. Thanks a lot for your explanation. > > > > > > > > @Konstantin @Yang > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > > > make TM respect it. User needs to guarantee there is no collision for > > > > different TM. > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> > > wrote: > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> > > wrote: > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > I think it is a good idea. Currently, our users also report a > > similar > > > > issue > > > > > > with > > > > > > resourceId of standalone cluster. When we start a standalone > > cluster > > > > now, > > > > > > the `TaskManagerRunner` always generates a uuid for the > > resourceId. It > > > > will > > > > > > be used to register to the jobmanager and not convenient to match > > with > > > > the > > > > > > real > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > I think a probably solution is we could support the user defined > > > > > > resourceId. > > > > > > We could get it from the environment. For standalone on K8s, we > > could > > > > set > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to > > match the > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to the > > resourceId. > > > > I > > > > > > think > > > > > > you could set the "deployment.meta.name". Since the pod name is > > > > generated > > > > > > by > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On > the > > > > > > contrary, we > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > Best, > > > > > > Yang > > > > > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 > > 下午8:06写道: > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > thanks you for working on this topic. I believe it will make > > > > debugging > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > I was wondering whether it would make sense to allow the user > to > > > > specify > > > > > > > the Resource ID in standalone setups? For example, many users > > still > > > > > > > implicitly use standalone clusters on Kubernetes (the native > > support > > > > is > > > > > > > still experimental) and in these cases it would be interesting > to > > > > also > > > > > > set > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > > > improvement > > > > > > > > helping our users and ourselves understanding better what's > > going > > > > on in > > > > > > > > Flink. > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a > > good > > > > idea. > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a > > good > > > > idea. > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make > > it a > > > > > > > > composition of the ResourceID + a monotonically increasing > > number. > > > > The > > > > > > > > problem is that in case of a RM failure the InstanceIDs would > > start > > > > > > from > > > > > > > 0 > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > > > correlated > > > > > > > is > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > > > separate > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > > > think we > > > > > > > > could remove the SlotRequestID and replace it with the > > > > AllocationID. > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots > > one > > > > could > > > > > > > > derive them from the SlotRequestID used for requesting the > > > > underlying > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > reworked > > > > with the > > > > > > > > pipelined region scheduling, we might be able to resolve > these > > two > > > > > > points > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Till > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: > > Improve > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > > > enhance > > > > > > the > > > > > > > > > readability of IDs in log and help user to debug in case of > > > > failures: > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. > > Most of > > > > them > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not > provide > > much > > > > > > > > > meaningful information and are hard to recognize and > compare > > for > > > > > > > > > users. > > > > > > > > > - Log the ID’s lineage information to make debugging more > > > > convenient. > > > > > > > > > Currently, the log fails to always show the lineage > > information > > > > > > > > > between IDs. Finding out relationships between entities > > > > identified by > > > > > > > > > given IDs is a common demand, e.g., slot of which > > AllocationID is > > > > > > > > > assigned to satisfy slot request of with SlotRequestID. > > Absence > > > > of > > > > > > > > > such lineage information, it’s impossible to track the end > > to end > > > > > > > > > lifecycle of an Execution or a Task now, which makes > > debugging > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > > > - Add topology information to graph components > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > Looking > > > > > > forward > > > > > > > > to > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache > > Flink > > > > > > > Conference > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > > > -- > > > > > > > Ververica GmbH > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > > Jason, > > > > Ji > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > |
Hi there,
Sorry for the belated reply. I just make a preliminary investigation of the infect of refactoring IntermediateResultPartitionID. The key change is making it being composed of IntermediateDataSetID and a partitionNum. public class IntermediateResultPartitionID { private final IntermediateDataSetID intermediateDataSetID; private final int partitionNum; } In this benchmark, I use examples/streaming/WordCount.jar as the test job and run Flink on Yarn. All the configurations are kept default except for "taskmanager.numberOfTaskSlots", which is set to 2. The result shows it does have an impact on performance. - After this change, the maximum parallelism went from 760 to 685, which limited by the total number of network buffers. For the same parallelism, user needs more network buffer than before. - The netty message "PartitionRequest" and "TaskEventRequest" increase by 4 bytes. For "PartitionRequest", it means 7% increase. - The TDD takes longer to construct. With 600 parallelisms, it takes twice as long to construct TDD than before. Details record in https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing The same issue could happen in ExecutionAttemptID, which will increase the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex and attemptNumber). But it may not infect the TDD as much as IntermediateResultPartitionID, since there is only one ExecutionAttemptID per TDD. After that preliminary investigation, I tend to not refactor ExecutionAttemptID and IntermediateResultPartitionID at the moment or treat it as future work. WDYT? @ZhuZhu @Till Best, Yangze Guo On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > > >> However, it seems the JobVertexID is derived from hashcode ... > You are right. JobVertexID is widely used and reworking it may affect the > public > interfaces, e.g. REST api. We can take it as a long term goal and exclude > it from this FLIP. > This same applies to IntermediateDataSetID, which can be also composed of a > JobID > and an index as Till proposed. > > Thanks, > Zhu Zhu > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > > > For the IntermediateDataSetID I was just thinking that it might actually be > > interesting to know which job produced the result which, by using cluster > > partitions, could be used across different jobs. Not saying that we have to > > do it, though. > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with > > too large TDDs there is already an issue [1]. The current suspicion is that > > the size of TDDs for jobs with a large parallelism can indeed become > > problematic for Flink. Hence, it would be great to investigate the impacts > > of the proposed changes. > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > Cheers, > > Till > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> wrote: > > > > > Hi, Zhu, > > > > > > Thanks for the feedback. > > > > > > > make JobVertexID a composition of JobID and a topology index > > > I think it is a good idea. However, it seems the JobVertexID is > > > derived from hashcode which used to identify them across submission. > > > I'm not familiar with that component though. I prefer to keep this > > > idea out the scope of this FLIP if no one could help us to figure it > > > out. > > > > > > > How about we still keep IntermediateDataSetID independent from > > > JobVertexID, > > > > but just print the producing relationships in logs? I think keeping > > > > IntermediateDataSetID independent may be better considering the cross > > job > > > > result usages in interactive query cases. > > > I think you are right. I'll keep IntermediateDataSetID independent > > > from JobVertexID. > > > > > > > The new IDs will become larger with this rework. > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to > > > provide one during the implementation phase. > > > > > > > > > Best, > > > Yangze Guo > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > Here are a few questions for it: > > > > 1. Shall we make JobVertexID a composition of JobID and a topology > > index? > > > > This would help in the session cluster case, so that we can identify > > > which > > > > tasks are from which jobs along with the rework of ExecutionAttemptID. > > > > > > > > 2. You mentioned that "Add the producer info to the string literal of > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > > > > composition of JobVertexID and a consumer index? > > > > How about we still keep IntermediateDataSetID independent from > > > JobVertexID, > > > > but just print the producing relationships in logs? I think keeping > > > > IntermediateDataSetID independent may be better considering the cross > > job > > > > result usages in interactive query cases. > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > TaskDeploymentDescriptor can become much larger since it is mainly > > > composed > > > > of a variety DIs. I'm not sure how much it would be but there can be > > more > > > > memory and CPU cost for it, and results in more frequent GCs, message > > > size > > > > exceeding akka frame limits, and a longer blocked time of main thread. > > > > This should not be a problem in most cases but might be a problem for > > > large > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > Thanks, > > > > Zhu Zhu > > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > > > @Till > > > > > I'm +1 for your two ideas and I'd like to move these two out of the > > > > > scope of this FLIP since the pipelined region scheduling is an > > ongoing > > > > > work now. > > > > > I also agree that we should not make the InstanceID in > > > > > TaskExecutorConnection being composed of the ResourceID plus a > > > > > monotonically increasing value. Thanks a lot for your explanation. > > > > > > > > > > @Konstantin @Yang > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > > > > make TM respect it. User needs to guarantee there is no collision for > > > > > different TM. > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> > > > wrote: > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> > > > wrote: > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also report a > > > similar > > > > > issue > > > > > > > with > > > > > > > resourceId of standalone cluster. When we start a standalone > > > cluster > > > > > now, > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > resourceId. It > > > > > will > > > > > > > be used to register to the jobmanager and not convenient to match > > > with > > > > > the > > > > > > > real > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > I think a probably solution is we could support the user defined > > > > > > > resourceId. > > > > > > > We could get it from the environment. For standalone on K8s, we > > > could > > > > > set > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to > > > match the > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to the > > > resourceId. > > > > > I > > > > > > > think > > > > > > > you could set the "deployment.meta.name". Since the pod name is > > > > > generated > > > > > > > by > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > the > > > > > > > contrary, we > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Yang > > > > > > > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 > > > 下午8:06写道: > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it will make > > > > > debugging > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow the user > > to > > > > > specify > > > > > > > > the Resource ID in standalone setups? For example, many users > > > still > > > > > > > > implicitly use standalone clusters on Kubernetes (the native > > > support > > > > > is > > > > > > > > still experimental) and in these cases it would be interesting > > to > > > > > also > > > > > > > set > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > > > > improvement > > > > > > > > > helping our users and ourselves understanding better what's > > > going > > > > > on in > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a > > > good > > > > > idea. > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a > > > good > > > > > idea. > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make > > > it a > > > > > > > > > composition of the ResourceID + a monotonically increasing > > > number. > > > > > The > > > > > > > > > problem is that in case of a RM failure the InstanceIDs would > > > start > > > > > > > from > > > > > > > > 0 > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > > > > correlated > > > > > > > > is > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > > > > separate > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > > > > think we > > > > > > > > > could remove the SlotRequestID and replace it with the > > > > > AllocationID. > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots > > > one > > > > > could > > > > > > > > > derive them from the SlotRequestID used for requesting the > > > > > underlying > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > > reworked > > > > > with the > > > > > > > > > pipelined region scheduling, we might be able to resolve > > these > > > two > > > > > > > points > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Till > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: > > > Improve > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > > > > enhance > > > > > > > the > > > > > > > > > > readability of IDs in log and help user to debug in case of > > > > > failures: > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. > > > Most of > > > > > them > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not > > provide > > > much > > > > > > > > > > meaningful information and are hard to recognize and > > compare > > > for > > > > > > > > > > users. > > > > > > > > > > - Log the ID’s lineage information to make debugging more > > > > > convenient. > > > > > > > > > > Currently, the log fails to always show the lineage > > > information > > > > > > > > > > between IDs. Finding out relationships between entities > > > > > identified by > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > AllocationID is > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID. > > > Absence > > > > > of > > > > > > > > > > such lineage information, it’s impossible to track the end > > > to end > > > > > > > > > > lifecycle of an Execution or a Task now, which makes > > > debugging > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > Looking > > > > > > > forward > > > > > > > > > to > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache > > > Flink > > > > > > > > Conference > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > > > > > -- > > > > > > > > Ververica GmbH > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > > > Jason, > > > > > Ji > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi everyone,
After an offline discussion with ZhuZhu, I have some comments on this investigation. Regarding the maximum parallelism went from 760 to 685, it may because of that the tasks are not scheduled evenly. The result is inconsistent in multiple experiments. So, this phenomenon would be irrelevant to our changes. I think what we really care about is the framesize for Akka(Should user enlarge it after our change for the same job). The size of TDD after serialization seems to be smaller after change. I don't know the root reason of this phenomenon at the moment. The way I measure it is: ``` ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(deployment); oos.flush(); LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); ``` Please correct me if I'm wrong. I'll experiment with higher parallelism to see if there is any regression regarding Akka framesize. Regarding the TDD building time, the parallelism in my investigation might be too small. Meanwhile, this time might be influence by many factors. Thus, I'll - experiment with higher parallelism. - measure the time spent from "Starting scheduling" to the last task change state to running. Best, Yangze Guo On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> wrote: > > Hi there, > > Sorry for the belated reply. I just make a preliminary investigation > of the infect of refactoring IntermediateResultPartitionID. > > The key change is making it being composed of IntermediateDataSetID > and a partitionNum. > public class IntermediateResultPartitionID { > private final IntermediateDataSetID intermediateDataSetID; > private final int partitionNum; > } > > In this benchmark, I use examples/streaming/WordCount.jar as the test > job and run Flink on Yarn. All the configurations are kept default > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > The result shows it does have an impact on performance. > - After this change, the maximum parallelism went from 760 to 685, > which limited by the total number of network buffers. For the same > parallelism, user needs more network buffer than before. > - The netty message "PartitionRequest" and "TaskEventRequest" increase > by 4 bytes. For "PartitionRequest", it means 7% increase. > - The TDD takes longer to construct. With 600 parallelisms, it takes > twice as long to construct TDD than before. > > Details record in > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > The same issue could happen in ExecutionAttemptID, which will increase > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > and attemptNumber). But it may not infect the TDD as much as > IntermediateResultPartitionID, since there is only one > ExecutionAttemptID per TDD. > > After that preliminary investigation, I tend to not refactor > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > treat it as future work. > > WDYT? @ZhuZhu @Till > > Best, > Yangze Guo > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > You are right. JobVertexID is widely used and reworking it may affect the > > public > > interfaces, e.g. REST api. We can take it as a long term goal and exclude > > it from this FLIP. > > This same applies to IntermediateDataSetID, which can be also composed of a > > JobID > > and an index as Till proposed. > > > > Thanks, > > Zhu Zhu > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > > > > > For the IntermediateDataSetID I was just thinking that it might actually be > > > interesting to know which job produced the result which, by using cluster > > > partitions, could be used across different jobs. Not saying that we have to > > > do it, though. > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with > > > too large TDDs there is already an issue [1]. The current suspicion is that > > > the size of TDDs for jobs with a large parallelism can indeed become > > > problematic for Flink. Hence, it would be great to investigate the impacts > > > of the proposed changes. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > Cheers, > > > Till > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> wrote: > > > > > > > Hi, Zhu, > > > > > > > > Thanks for the feedback. > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > derived from hashcode which used to identify them across submission. > > > > I'm not familiar with that component though. I prefer to keep this > > > > idea out the scope of this FLIP if no one could help us to figure it > > > > out. > > > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > JobVertexID, > > > > > but just print the producing relationships in logs? I think keeping > > > > > IntermediateDataSetID independent may be better considering the cross > > > job > > > > > result usages in interactive query cases. > > > > I think you are right. I'll keep IntermediateDataSetID independent > > > > from JobVertexID. > > > > > > > > > The new IDs will become larger with this rework. > > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to > > > > provide one during the implementation phase. > > > > > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > Here are a few questions for it: > > > > > 1. Shall we make JobVertexID a composition of JobID and a topology > > > index? > > > > > This would help in the session cluster case, so that we can identify > > > > which > > > > > tasks are from which jobs along with the rework of ExecutionAttemptID. > > > > > > > > > > 2. You mentioned that "Add the producer info to the string literal of > > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > > > > > composition of JobVertexID and a consumer index? > > > > > How about we still keep IntermediateDataSetID independent from > > > > JobVertexID, > > > > > but just print the producing relationships in logs? I think keeping > > > > > IntermediateDataSetID independent may be better considering the cross > > > job > > > > > result usages in interactive query cases. > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > TaskDeploymentDescriptor can become much larger since it is mainly > > > > composed > > > > > of a variety DIs. I'm not sure how much it would be but there can be > > > more > > > > > memory and CPU cost for it, and results in more frequent GCs, message > > > > size > > > > > exceeding akka frame limits, and a longer blocked time of main thread. > > > > > This should not be a problem in most cases but might be a problem for > > > > large > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > Thanks, > > > > > Zhu Zhu > > > > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > > > > > @Till > > > > > > I'm +1 for your two ideas and I'd like to move these two out of the > > > > > > scope of this FLIP since the pipelined region scheduling is an > > > ongoing > > > > > > work now. > > > > > > I also agree that we should not make the InstanceID in > > > > > > TaskExecutorConnection being composed of the ResourceID plus a > > > > > > monotonically increasing value. Thanks a lot for your explanation. > > > > > > > > > > > > @Konstantin @Yang > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > > > > > make TM respect it. User needs to guarantee there is no collision for > > > > > > different TM. > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also report a > > > > similar > > > > > > issue > > > > > > > > with > > > > > > > > resourceId of standalone cluster. When we start a standalone > > > > cluster > > > > > > now, > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > resourceId. It > > > > > > will > > > > > > > > be used to register to the jobmanager and not convenient to match > > > > with > > > > > > the > > > > > > > > real > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > I think a probably solution is we could support the user defined > > > > > > > > resourceId. > > > > > > > > We could get it from the environment. For standalone on K8s, we > > > > could > > > > > > set > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to > > > > match the > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to the > > > > resourceId. > > > > > > I > > > > > > > > think > > > > > > > > you could set the "deployment.meta.name". Since the pod name is > > > > > > generated > > > > > > > > by > > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > the > > > > > > > > contrary, we > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > Yang > > > > > > > > > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it will make > > > > > > debugging > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow the user > > > to > > > > > > specify > > > > > > > > > the Resource ID in standalone setups? For example, many users > > > > still > > > > > > > > > implicitly use standalone clusters on Kubernetes (the native > > > > support > > > > > > is > > > > > > > > > still experimental) and in these cases it would be interesting > > > to > > > > > > also > > > > > > > > set > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > [hidden email]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > > > > > improvement > > > > > > > > > > helping our users and ourselves understanding better what's > > > > going > > > > > > on in > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a > > > > good > > > > > > idea. > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a > > > > good > > > > > > idea. > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make > > > > it a > > > > > > > > > > composition of the ResourceID + a monotonically increasing > > > > number. > > > > > > The > > > > > > > > > > problem is that in case of a RM failure the InstanceIDs would > > > > start > > > > > > > > from > > > > > > > > > 0 > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > > > > > correlated > > > > > > > > > is > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > > > > > separate > > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > > > > > think we > > > > > > > > > > could remove the SlotRequestID and replace it with the > > > > > > AllocationID. > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots > > > > one > > > > > > could > > > > > > > > > > derive them from the SlotRequestID used for requesting the > > > > > > underlying > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > > > reworked > > > > > > with the > > > > > > > > > > pipelined region scheduling, we might be able to resolve > > > these > > > > two > > > > > > > > points > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: > > > > Improve > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > > > > > enhance > > > > > > > > the > > > > > > > > > > > readability of IDs in log and help user to debug in case of > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. > > > > Most of > > > > > > them > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not > > > provide > > > > much > > > > > > > > > > > meaningful information and are hard to recognize and > > > compare > > > > for > > > > > > > > > > > users. > > > > > > > > > > > - Log the ID’s lineage information to make debugging more > > > > > > convenient. > > > > > > > > > > > Currently, the log fails to always show the lineage > > > > information > > > > > > > > > > > between IDs. Finding out relationships between entities > > > > > > identified by > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > AllocationID is > > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID. > > > > Absence > > > > > > of > > > > > > > > > > > such lineage information, it’s impossible to track the end > > > > to end > > > > > > > > > > > lifecycle of an Execution or a Task now, which makes > > > > debugging > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > Looking > > > > > > > > forward > > > > > > > > > > to > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache > > > > Flink > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Ververica GmbH > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > > > > Jason, > > > > > > Ji > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi everyone,
I've investigated the infect with higher parallelism jobs. The result shows: - The size of TDD after serialization become smaller than before. While I did not meet any issue with Akka framework when the parallelism set to 6000. - There was no significant difference regarding the end to end schedule time, job runtime, young gc count and total full gc time. For details, please take a look at https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing. From my perspective, I think it might be ok to refactor ExecutionAttemptID and IntermediateResultPartitionID. If you have further concerns or think we should make further investigation. Please let me know. Best, Yangze Guo On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <[hidden email]> wrote: > > Hi everyone, > After an offline discussion with ZhuZhu, I have some comments on this > investigation. > > Regarding the maximum parallelism went from 760 to 685, it may because > of that the tasks are not scheduled evenly. The result is inconsistent > in multiple experiments. So, this phenomenon would be irrelevant to > our changes. > > I think what we really care about is the framesize for Akka(Should > user enlarge it after our change for the same job). The size of TDD > after serialization seems to be smaller after change. I don't know the > root reason of this phenomenon at the moment. The way I measure it is: > ``` > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > ObjectOutputStream oos = new ObjectOutputStream(bos); > oos.writeObject(deployment); > oos.flush(); > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > ``` > Please correct me if I'm wrong. > > I'll experiment with higher parallelism to see if there is any > regression regarding Akka framesize. > > Regarding the TDD building time, the parallelism in my investigation > might be too small. Meanwhile, this time might be influence by many > factors. Thus, I'll > - experiment with higher parallelism. > - measure the time spent from "Starting scheduling" to the last task > change state to running. > > Best, > Yangze Guo > > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> wrote: > > > > Hi there, > > > > Sorry for the belated reply. I just make a preliminary investigation > > of the infect of refactoring IntermediateResultPartitionID. > > > > The key change is making it being composed of IntermediateDataSetID > > and a partitionNum. > > public class IntermediateResultPartitionID { > > private final IntermediateDataSetID intermediateDataSetID; > > private final int partitionNum; > > } > > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > > job and run Flink on Yarn. All the configurations are kept default > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > > > The result shows it does have an impact on performance. > > - After this change, the maximum parallelism went from 760 to 685, > > which limited by the total number of network buffers. For the same > > parallelism, user needs more network buffer than before. > > - The netty message "PartitionRequest" and "TaskEventRequest" increase > > by 4 bytes. For "PartitionRequest", it means 7% increase. > > - The TDD takes longer to construct. With 600 parallelisms, it takes > > twice as long to construct TDD than before. > > > > Details record in > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > > > The same issue could happen in ExecutionAttemptID, which will increase > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > > and attemptNumber). But it may not infect the TDD as much as > > IntermediateResultPartitionID, since there is only one > > ExecutionAttemptID per TDD. > > > > After that preliminary investigation, I tend to not refactor > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > > treat it as future work. > > > > WDYT? @ZhuZhu @Till > > > > Best, > > Yangze Guo > > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > > > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > > You are right. JobVertexID is widely used and reworking it may affect the > > > public > > > interfaces, e.g. REST api. We can take it as a long term goal and exclude > > > it from this FLIP. > > > This same applies to IntermediateDataSetID, which can be also composed of a > > > JobID > > > and an index as Till proposed. > > > > > > Thanks, > > > Zhu Zhu > > > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > > > > > > > For the IntermediateDataSetID I was just thinking that it might actually be > > > > interesting to know which job produced the result which, by using cluster > > > > partitions, could be used across different jobs. Not saying that we have to > > > > do it, though. > > > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with > > > > too large TDDs there is already an issue [1]. The current suspicion is that > > > > the size of TDDs for jobs with a large parallelism can indeed become > > > > problematic for Flink. Hence, it would be great to investigate the impacts > > > > of the proposed changes. > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > > > Cheers, > > > > Till > > > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> wrote: > > > > > > > > > Hi, Zhu, > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > > derived from hashcode which used to identify them across submission. > > > > > I'm not familiar with that component though. I prefer to keep this > > > > > idea out the scope of this FLIP if no one could help us to figure it > > > > > out. > > > > > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > JobVertexID, > > > > > > but just print the producing relationships in logs? I think keeping > > > > > > IntermediateDataSetID independent may be better considering the cross > > > > job > > > > > > result usages in interactive query cases. > > > > > I think you are right. I'll keep IntermediateDataSetID independent > > > > > from JobVertexID. > > > > > > > > > > > The new IDs will become larger with this rework. > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to > > > > > provide one during the implementation phase. > > > > > > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> wrote: > > > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall > > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > > > Here are a few questions for it: > > > > > > 1. Shall we make JobVertexID a composition of JobID and a topology > > > > index? > > > > > > This would help in the session cluster case, so that we can identify > > > > > which > > > > > > tasks are from which jobs along with the rework of ExecutionAttemptID. > > > > > > > > > > > > 2. You mentioned that "Add the producer info to the string literal of > > > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a > > > > > > composition of JobVertexID and a consumer index? > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > JobVertexID, > > > > > > but just print the producing relationships in logs? I think keeping > > > > > > IntermediateDataSetID independent may be better considering the cross > > > > job > > > > > > result usages in interactive query cases. > > > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > > TaskDeploymentDescriptor can become much larger since it is mainly > > > > > composed > > > > > > of a variety DIs. I'm not sure how much it would be but there can be > > > > more > > > > > > memory and CPU cost for it, and results in more frequent GCs, message > > > > > size > > > > > > exceeding akka frame limits, and a longer blocked time of main thread. > > > > > > This should not be a problem in most cases but might be a problem for > > > > > large > > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > > > Thanks, > > > > > > Zhu Zhu > > > > > > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > > > > > > > @Till > > > > > > > I'm +1 for your two ideas and I'd like to move these two out of the > > > > > > > scope of this FLIP since the pipelined region scheduling is an > > > > ongoing > > > > > > > work now. > > > > > > > I also agree that we should not make the InstanceID in > > > > > > > TaskExecutorConnection being composed of the ResourceID plus a > > > > > > > monotonically increasing value. Thanks a lot for your explanation. > > > > > > > > > > > > > > @Konstantin @Yang > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and > > > > > > > make TM respect it. User needs to guarantee there is no collision for > > > > > > > different TM. > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <[hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <[hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also report a > > > > > similar > > > > > > > issue > > > > > > > > > with > > > > > > > > > resourceId of standalone cluster. When we start a standalone > > > > > cluster > > > > > > > now, > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > > resourceId. It > > > > > > > will > > > > > > > > > be used to register to the jobmanager and not convenient to match > > > > > with > > > > > > > the > > > > > > > > > real > > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > > > I think a probably solution is we could support the user defined > > > > > > > > > resourceId. > > > > > > > > > We could get it from the environment. For standalone on K8s, we > > > > > could > > > > > > > set > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to > > > > > match the > > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to the > > > > > resourceId. > > > > > > > I > > > > > > > > > think > > > > > > > > > you could set the "deployment.meta.name". Since the pod name is > > > > > > > generated > > > > > > > > > by > > > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > > the > > > > > > > > > contrary, we > > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Yang > > > > > > > > > > > > > > > > > > Konstantin Knauf <[hidden email]> 于2020年3月29日周日 > > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it will make > > > > > > > debugging > > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow the user > > > > to > > > > > > > specify > > > > > > > > > > the Resource ID in standalone setups? For example, many users > > > > > still > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the native > > > > > support > > > > > > > is > > > > > > > > > > still experimental) and in these cases it would be interesting > > > > to > > > > > > > also > > > > > > > > > set > > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > > [hidden email]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very good > > > > > > > improvement > > > > > > > > > > > helping our users and ourselves understanding better what's > > > > > going > > > > > > > on in > > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a > > > > > good > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a > > > > > good > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make > > > > > it a > > > > > > > > > > > composition of the ResourceID + a monotonically increasing > > > > > number. > > > > > > > The > > > > > > > > > > > problem is that in case of a RM failure the InstanceIDs would > > > > > start > > > > > > > > > from > > > > > > > > > > 0 > > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > > > Logging more information on how the different runtime IDs are > > > > > > > > > correlated > > > > > > > > > > is > > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the following: > > > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a > > > > > > > separate > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I > > > > > > > think we > > > > > > > > > > > could remove the SlotRequestID and replace it with the > > > > > > > AllocationID. > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots > > > > > one > > > > > > > could > > > > > > > > > > > derive them from the SlotRequestID used for requesting the > > > > > > > underlying > > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > > > > reworked > > > > > > > with the > > > > > > > > > > > pipelined region scheduling, we might be able to resolve > > > > these > > > > > two > > > > > > > > > points > > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > > [hidden email]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118: > > > > > Improve > > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, target to > > > > > > > enhance > > > > > > > > > the > > > > > > > > > > > > readability of IDs in log and help user to debug in case of > > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals of IDs. > > > > > Most of > > > > > > > them > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not > > > > provide > > > > > much > > > > > > > > > > > > meaningful information and are hard to recognize and > > > > compare > > > > > for > > > > > > > > > > > > users. > > > > > > > > > > > > - Log the ID’s lineage information to make debugging more > > > > > > > convenient. > > > > > > > > > > > > Currently, the log fails to always show the lineage > > > > > information > > > > > > > > > > > > between IDs. Finding out relationships between entities > > > > > > > identified by > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > > AllocationID is > > > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID. > > > > > Absence > > > > > > > of > > > > > > > > > > > > such lineage information, it’s impossible to track the end > > > > > to end > > > > > > > > > > > > lifecycle of an Execution or a Task now, which makes > > > > > debugging > > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed components > > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > > - Expose the identifier of distributing component to user > > > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > Looking > > > > > > > > > forward > > > > > > > > > > > to > > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache > > > > > Flink > > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Ververica GmbH > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung > > > > > Jason, > > > > > > > Ji > > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Thanks for doing this benchmark @Yangze Guo <[hidden email]> .
The result looks promising. So I would +1 to refactor ExecutionAttemptID and IntermediateResultPartitionID. Regarding why 'The size of TDD after serialization become smaller than before', I guess it's because the new IntermediateResultPartitionIDs can share the same IntermediateDataSetID, in this way the space of IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an int (index), which is smaller than 2 Longs (AbstractID). Thanks, Zhu Zhu Yangze Guo <[hidden email]> 于2020年4月14日周二 下午3:09写道: > Hi everyone, > > I've investigated the infect with higher parallelism jobs. > > The result shows: > - The size of TDD after serialization become smaller than before. > While I did not meet any issue with Akka framework when the > parallelism set to 6000. > - There was no significant difference regarding the end to end > schedule time, job runtime, young gc count and total full gc time. > > For details, please take a look at > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > . > > From my perspective, I think it might be ok to refactor > ExecutionAttemptID and IntermediateResultPartitionID. If you have > further concerns or think we should make further investigation. Please > let me know. > > Best, > Yangze Guo > > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <[hidden email]> wrote: > > > > Hi everyone, > > After an offline discussion with ZhuZhu, I have some comments on this > > investigation. > > > > Regarding the maximum parallelism went from 760 to 685, it may because > > of that the tasks are not scheduled evenly. The result is inconsistent > > in multiple experiments. So, this phenomenon would be irrelevant to > > our changes. > > > > I think what we really care about is the framesize for Akka(Should > > user enlarge it after our change for the same job). The size of TDD > > after serialization seems to be smaller after change. I don't know the > > root reason of this phenomenon at the moment. The way I measure it is: > > ``` > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > > ObjectOutputStream oos = new ObjectOutputStream(bos); > > oos.writeObject(deployment); > > oos.flush(); > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > > ``` > > Please correct me if I'm wrong. > > > > I'll experiment with higher parallelism to see if there is any > > regression regarding Akka framesize. > > > > Regarding the TDD building time, the parallelism in my investigation > > might be too small. Meanwhile, this time might be influence by many > > factors. Thus, I'll > > - experiment with higher parallelism. > > - measure the time spent from "Starting scheduling" to the last task > > change state to running. > > > > Best, > > Yangze Guo > > > > > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> wrote: > > > > > > Hi there, > > > > > > Sorry for the belated reply. I just make a preliminary investigation > > > of the infect of refactoring IntermediateResultPartitionID. > > > > > > The key change is making it being composed of IntermediateDataSetID > > > and a partitionNum. > > > public class IntermediateResultPartitionID { > > > private final IntermediateDataSetID intermediateDataSetID; > > > private final int partitionNum; > > > } > > > > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > > > job and run Flink on Yarn. All the configurations are kept default > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > > > > > The result shows it does have an impact on performance. > > > - After this change, the maximum parallelism went from 760 to 685, > > > which limited by the total number of network buffers. For the same > > > parallelism, user needs more network buffer than before. > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase > > > by 4 bytes. For "PartitionRequest", it means 7% increase. > > > - The TDD takes longer to construct. With 600 parallelisms, it takes > > > twice as long to construct TDD than before. > > > > > > Details record in > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > > > > > The same issue could happen in ExecutionAttemptID, which will increase > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > > > and attemptNumber). But it may not infect the TDD as much as > > > IntermediateResultPartitionID, since there is only one > > > ExecutionAttemptID per TDD. > > > > > > After that preliminary investigation, I tend to not refactor > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > > > treat it as future work. > > > > > > WDYT? @ZhuZhu @Till > > > > > > Best, > > > Yangze Guo > > > > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > > > > > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > > > You are right. JobVertexID is widely used and reworking it may > affect the > > > > public > > > > interfaces, e.g. REST api. We can take it as a long term goal and > exclude > > > > it from this FLIP. > > > > This same applies to IntermediateDataSetID, which can be also > composed of a > > > > JobID > > > > and an index as Till proposed. > > > > > > > > Thanks, > > > > Zhu Zhu > > > > > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > > > > > > > > > For the IntermediateDataSetID I was just thinking that it might > actually be > > > > > interesting to know which job produced the result which, by using > cluster > > > > > partitions, could be used across different jobs. Not saying that > we have to > > > > > do it, though. > > > > > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the > problem with > > > > > too large TDDs there is already an issue [1]. The current > suspicion is that > > > > > the size of TDDs for jobs with a large parallelism can indeed > become > > > > > problematic for Flink. Hence, it would be great to investigate the > impacts > > > > > of the proposed changes. > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> > wrote: > > > > > > > > > > > Hi, Zhu, > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > > > derived from hashcode which used to identify them across > submission. > > > > > > I'm not familiar with that component though. I prefer to keep > this > > > > > > idea out the scope of this FLIP if no one could help us to > figure it > > > > > > out. > > > > > > > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > > JobVertexID, > > > > > > > but just print the producing relationships in logs? I think > keeping > > > > > > > IntermediateDataSetID independent may be better considering > the cross > > > > > job > > > > > > > result usages in interactive query cases. > > > > > > I think you are right. I'll keep IntermediateDataSetID > independent > > > > > > from JobVertexID. > > > > > > > > > > > > > The new IDs will become larger with this rework. > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll > try to > > > > > > provide one during the implementation phase. > > > > > > > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> > wrote: > > > > > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the > overall > > > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > > > > > Here are a few questions for it: > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a > topology > > > > > index? > > > > > > > This would help in the session cluster case, so that we can > identify > > > > > > which > > > > > > > tasks are from which jobs along with the rework of > ExecutionAttemptID. > > > > > > > > > > > > > > 2. You mentioned that "Add the producer info to the string > literal of > > > > > > > IntermediateDataSetID". Do you mean to make > IntermediateDataSetID a > > > > > > > composition of JobVertexID and a consumer index? > > > > > > > How about we still keep IntermediateDataSetID independent from > > > > > > JobVertexID, > > > > > > > but just print the producing relationships in logs? I think > keeping > > > > > > > IntermediateDataSetID independent may be better considering > the cross > > > > > job > > > > > > > result usages in interactive query cases. > > > > > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > > > TaskDeploymentDescriptor can become much larger since it is > mainly > > > > > > composed > > > > > > > of a variety DIs. I'm not sure how much it would be but there > can be > > > > > more > > > > > > > memory and CPU cost for it, and results in more frequent GCs, > message > > > > > > size > > > > > > > exceeding akka frame limits, and a longer blocked time of main > thread. > > > > > > > This should not be a problem in most cases but might be a > problem for > > > > > > large > > > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > > > > > Thanks, > > > > > > > Zhu Zhu > > > > > > > > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > > > > > > > > > > > > > > > > @Till > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out > of the > > > > > > > > scope of this FLIP since the pipelined region scheduling is > an > > > > > ongoing > > > > > > > > work now. > > > > > > > > I also agree that we should not make the InstanceID in > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus > a > > > > > > > > monotonically increasing value. Thanks a lot for your > explanation. > > > > > > > > > > > > > > > > @Konstantin @Yang > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > > > > > > > > suggestion. It makes sense to me to let user export > RESOURCE_ID and > > > > > > > > make TM respect it. User needs to guarantee there is no > collision for > > > > > > > > different TM. > > > > > > > > > > > > > > > > Best, > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also > report a > > > > > > similar > > > > > > > > issue > > > > > > > > > > with > > > > > > > > > > resourceId of standalone cluster. When we start a > standalone > > > > > > cluster > > > > > > > > now, > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > > > resourceId. It > > > > > > > > will > > > > > > > > > > be used to register to the jobmanager and not convenient > to match > > > > > > with > > > > > > > > the > > > > > > > > > > real > > > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > > > > > I think a probably solution is we could support the user > defined > > > > > > > > > > resourceId. > > > > > > > > > > We could get it from the environment. For standalone on > K8s, we > > > > > > could > > > > > > > > set > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is > easier to > > > > > > match the > > > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to > the > > > > > > resourceId. > > > > > > > > I > > > > > > > > > > think > > > > > > > > > > you could set the "deployment.meta.name". Since the pod > name is > > > > > > > > generated > > > > > > > > > > by > > > > > > > > > > K8s in the pattern > {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > > > the > > > > > > > > > > contrary, we > > > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Yang > > > > > > > > > > > > > > > > > > > > Konstantin Knauf <[hidden email]> > 于2020年3月29日周日 > > > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it > will make > > > > > > > > debugging > > > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow > the user > > > > > to > > > > > > > > specify > > > > > > > > > > > the Resource ID in standalone setups? For example, > many users > > > > > > still > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the > native > > > > > > support > > > > > > > > is > > > > > > > > > > > still experimental) and in these cases it would be > interesting > > > > > to > > > > > > > > also > > > > > > > > > > set > > > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > > > [hidden email]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very > good > > > > > > > > improvement > > > > > > > > > > > > helping our users and ourselves understanding better > what's > > > > > > going > > > > > > > > on in > > > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod > name is a > > > > > > good > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset > ID is a > > > > > > good > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would > not make > > > > > > it a > > > > > > > > > > > > composition of the ResourceID + a monotonically > increasing > > > > > > number. > > > > > > > > The > > > > > > > > > > > > problem is that in case of a RM failure the > InstanceIDs would > > > > > > start > > > > > > > > > > from > > > > > > > > > > > 0 > > > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > > > > > Logging more information on how the different > runtime IDs are > > > > > > > > > > correlated > > > > > > > > > > > is > > > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the > following: > > > > > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the > SlotPool was a > > > > > > > > separate > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being > the case I > > > > > > > > think we > > > > > > > > > > > > could remove the SlotRequestID and replace it with > the > > > > > > > > AllocationID. > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi > task slots > > > > > > one > > > > > > > > could > > > > > > > > > > > > derive them from the SlotRequestID used for > requesting the > > > > > > > > underlying > > > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely be > > > > > reworked > > > > > > > > with the > > > > > > > > > > > > pipelined region scheduling, we might be able to > resolve > > > > > these > > > > > > two > > > > > > > > > > points > > > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > > > [hidden email]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on > "FLIP-118: > > > > > > Improve > > > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, > target to > > > > > > > > enhance > > > > > > > > > > the > > > > > > > > > > > > > readability of IDs in log and help user to debug > in case of > > > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals > of IDs. > > > > > > Most of > > > > > > > > them > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do > not > > > > > provide > > > > > > much > > > > > > > > > > > > > meaningful information and are hard to recognize > and > > > > > compare > > > > > > for > > > > > > > > > > > > > users. > > > > > > > > > > > > > - Log the ID’s lineage information to make > debugging more > > > > > > > > convenient. > > > > > > > > > > > > > Currently, the log fails to always show the lineage > > > > > > information > > > > > > > > > > > > > between IDs. Finding out relationships between > entities > > > > > > > > identified by > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > > > AllocationID is > > > > > > > > > > > > > assigned to satisfy slot request of with > SlotRequestID. > > > > > > Absence > > > > > > > > of > > > > > > > > > > > > > such lineage information, it’s impossible to track > the end > > > > > > to end > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which > makes > > > > > > debugging > > > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed > components > > > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > > > - Expose the identifier of distributing component > to user > > > > > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document > [1]. > > > > > > Looking > > > > > > > > > > forward > > > > > > > > > > > > to > > > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica < > https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The > Apache > > > > > > Flink > > > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > Germany > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > Ververica GmbH > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip > Park Tung > > > > > > Jason, > > > > > > > > Ji > > > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
This is good news Yangze. Decreasing the size of our id's is a really nice
side effect :-) Hence, +1 from my side as well. Cheers, Till On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <[hidden email]> wrote: > Thanks for doing this benchmark @Yangze Guo <[hidden email]> . > The result looks promising. So I would +1 to refactor ExecutionAttemptID > and IntermediateResultPartitionID. > > Regarding why 'The size of TDD after serialization become smaller than > before', I guess it's because the new IntermediateResultPartitionIDs can > share the same IntermediateDataSetID, in this way the space of > IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an > int (index), which is smaller than 2 Longs (AbstractID). > > Thanks, > Zhu Zhu > > Yangze Guo <[hidden email]> 于2020年4月14日周二 下午3:09写道: > > > Hi everyone, > > > > I've investigated the infect with higher parallelism jobs. > > > > The result shows: > > - The size of TDD after serialization become smaller than before. > > While I did not meet any issue with Akka framework when the > > parallelism set to 6000. > > - There was no significant difference regarding the end to end > > schedule time, job runtime, young gc count and total full gc time. > > > > For details, please take a look at > > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > . > > > > From my perspective, I think it might be ok to refactor > > ExecutionAttemptID and IntermediateResultPartitionID. If you have > > further concerns or think we should make further investigation. Please > > let me know. > > > > Best, > > Yangze Guo > > > > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <[hidden email]> wrote: > > > > > > Hi everyone, > > > After an offline discussion with ZhuZhu, I have some comments on this > > > investigation. > > > > > > Regarding the maximum parallelism went from 760 to 685, it may because > > > of that the tasks are not scheduled evenly. The result is inconsistent > > > in multiple experiments. So, this phenomenon would be irrelevant to > > > our changes. > > > > > > I think what we really care about is the framesize for Akka(Should > > > user enlarge it after our change for the same job). The size of TDD > > > after serialization seems to be smaller after change. I don't know the > > > root reason of this phenomenon at the moment. The way I measure it is: > > > ``` > > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > > > ObjectOutputStream oos = new ObjectOutputStream(bos); > > > oos.writeObject(deployment); > > > oos.flush(); > > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > > > ``` > > > Please correct me if I'm wrong. > > > > > > I'll experiment with higher parallelism to see if there is any > > > regression regarding Akka framesize. > > > > > > Regarding the TDD building time, the parallelism in my investigation > > > might be too small. Meanwhile, this time might be influence by many > > > factors. Thus, I'll > > > - experiment with higher parallelism. > > > - measure the time spent from "Starting scheduling" to the last task > > > change state to running. > > > > > > Best, > > > Yangze Guo > > > > > > > > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> > wrote: > > > > > > > > Hi there, > > > > > > > > Sorry for the belated reply. I just make a preliminary investigation > > > > of the infect of refactoring IntermediateResultPartitionID. > > > > > > > > The key change is making it being composed of IntermediateDataSetID > > > > and a partitionNum. > > > > public class IntermediateResultPartitionID { > > > > private final IntermediateDataSetID intermediateDataSetID; > > > > private final int partitionNum; > > > > } > > > > > > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > > > > job and run Flink on Yarn. All the configurations are kept default > > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > > > > > > > > The result shows it does have an impact on performance. > > > > - After this change, the maximum parallelism went from 760 to 685, > > > > which limited by the total number of network buffers. For the same > > > > parallelism, user needs more network buffer than before. > > > > - The netty message "PartitionRequest" and "TaskEventRequest" > increase > > > > by 4 bytes. For "PartitionRequest", it means 7% increase. > > > > - The TDD takes longer to construct. With 600 parallelisms, it takes > > > > twice as long to construct TDD than before. > > > > > > > > Details record in > > > > > > > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > > > > > > > > The same issue could happen in ExecutionAttemptID, which will > increase > > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > > > > and attemptNumber). But it may not infect the TDD as much as > > > > IntermediateResultPartitionID, since there is only one > > > > ExecutionAttemptID per TDD. > > > > > > > > After that preliminary investigation, I tend to not refactor > > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > > > > treat it as future work. > > > > > > > > WDYT? @ZhuZhu @Till > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > > > > > > > > > > >> However, it seems the JobVertexID is derived from hashcode ... > > > > > You are right. JobVertexID is widely used and reworking it may > > affect the > > > > > public > > > > > interfaces, e.g. REST api. We can take it as a long term goal and > > exclude > > > > > it from this FLIP. > > > > > This same applies to IntermediateDataSetID, which can be also > > composed of a > > > > > JobID > > > > > and an index as Till proposed. > > > > > > > > > > Thanks, > > > > > Zhu Zhu > > > > > > > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > > > > > > > > > > > For the IntermediateDataSetID I was just thinking that it might > > actually be > > > > > > interesting to know which job produced the result which, by using > > cluster > > > > > > partitions, could be used across different jobs. Not saying that > > we have to > > > > > > do it, though. > > > > > > > > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the > > problem with > > > > > > too large TDDs there is already an issue [1]. The current > > suspicion is that > > > > > > the size of TDDs for jobs with a large parallelism can indeed > > become > > > > > > problematic for Flink. Hence, it would be great to investigate > the > > impacts > > > > > > of the proposed changes. > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> > > wrote: > > > > > > > > > > > > > Hi, Zhu, > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > make JobVertexID a composition of JobID and a topology index > > > > > > > I think it is a good idea. However, it seems the JobVertexID is > > > > > > > derived from hashcode which used to identify them across > > submission. > > > > > > > I'm not familiar with that component though. I prefer to keep > > this > > > > > > > idea out the scope of this FLIP if no one could help us to > > figure it > > > > > > > out. > > > > > > > > > > > > > > > How about we still keep IntermediateDataSetID independent > from > > > > > > > JobVertexID, > > > > > > > > but just print the producing relationships in logs? I think > > keeping > > > > > > > > IntermediateDataSetID independent may be better considering > > the cross > > > > > > job > > > > > > > > result usages in interactive query cases. > > > > > > > I think you are right. I'll keep IntermediateDataSetID > > independent > > > > > > > from JobVertexID. > > > > > > > > > > > > > > > The new IDs will become larger with this rework. > > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll > > try to > > > > > > > provide one during the implementation phase. > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> > > wrote: > > > > > > > > > > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the > > overall > > > > > > > > proposal. It can help a lot in troubleshooting. > > > > > > > > > > > > > > > > Here are a few questions for it: > > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a > > topology > > > > > > index? > > > > > > > > This would help in the session cluster case, so that we can > > identify > > > > > > > which > > > > > > > > tasks are from which jobs along with the rework of > > ExecutionAttemptID. > > > > > > > > > > > > > > > > 2. You mentioned that "Add the producer info to the string > > literal of > > > > > > > > IntermediateDataSetID". Do you mean to make > > IntermediateDataSetID a > > > > > > > > composition of JobVertexID and a consumer index? > > > > > > > > How about we still keep IntermediateDataSetID independent > from > > > > > > > JobVertexID, > > > > > > > > but just print the producing relationships in logs? I think > > keeping > > > > > > > > IntermediateDataSetID independent may be better considering > > the cross > > > > > > job > > > > > > > > result usages in interactive query cases. > > > > > > > > > > > > > > > > 3. The new IDs will become larger with this rework. The > > > > > > > > TaskDeploymentDescriptor can become much larger since it is > > mainly > > > > > > > composed > > > > > > > > of a variety DIs. I'm not sure how much it would be but there > > can be > > > > > > more > > > > > > > > memory and CPU cost for it, and results in more frequent GCs, > > message > > > > > > > size > > > > > > > > exceeding akka frame limits, and a longer blocked time of > main > > thread. > > > > > > > > This should not be a problem in most cases but might be a > > problem for > > > > > > > large > > > > > > > > scale jobs. Shall we have an benchmark for it? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Zhu Zhu > > > > > > > > > > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > > > > > > > > > > > > > > > > > Thank you all for the feedback! Sorry for the belated > reply. > > > > > > > > > > > > > > > > > > @Till > > > > > > > > > I'm +1 for your two ideas and I'd like to move these two > out > > of the > > > > > > > > > scope of this FLIP since the pipelined region scheduling is > > an > > > > > > ongoing > > > > > > > > > work now. > > > > > > > > > I also agree that we should not make the InstanceID in > > > > > > > > > TaskExecutorConnection being composed of the ResourceID > plus > > a > > > > > > > > > monotonically increasing value. Thanks a lot for your > > explanation. > > > > > > > > > > > > > > > > > > @Konstantin @Yang > > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second > Yang's > > > > > > > > > suggestion. It makes sense to me to let user export > > RESOURCE_ID and > > > > > > > > > make TM respect it. User needs to guarantee there is no > > collision for > > > > > > > > > different TM. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > > > > > > > > > > > > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Konstantin, > > > > > > > > > > > > > > > > > > > > > > I think it is a good idea. Currently, our users also > > report a > > > > > > > similar > > > > > > > > > issue > > > > > > > > > > > with > > > > > > > > > > > resourceId of standalone cluster. When we start a > > standalone > > > > > > > cluster > > > > > > > > > now, > > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > > > > > > > resourceId. It > > > > > > > > > will > > > > > > > > > > > be used to register to the jobmanager and not > convenient > > to match > > > > > > > with > > > > > > > > > the > > > > > > > > > > > real > > > > > > > > > > > taskmanager, especially in container environment. > > > > > > > > > > > > > > > > > > > > > > I think a probably solution is we could support the > user > > defined > > > > > > > > > > > resourceId. > > > > > > > > > > > We could get it from the environment. For standalone on > > K8s, we > > > > > > > could > > > > > > > > > set > > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is > > easier to > > > > > > > match the > > > > > > > > > > > taskmanager with K8s pod. > > > > > > > > > > > > > > > > > > > > > > Moreover, i am afraid we could not set the pod name to > > the > > > > > > > resourceId. > > > > > > > > > I > > > > > > > > > > > think > > > > > > > > > > > you could set the "deployment.meta.name". Since the > pod > > name is > > > > > > > > > generated > > > > > > > > > > > by > > > > > > > > > > > K8s in the pattern > > {deployment.meta.nane}-{rc.uuid}-{uuid}. On > > > > > > the > > > > > > > > > > > contrary, we > > > > > > > > > > > will set the resourceId to the pod name. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Yang > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf <[hidden email]> > > 于2020年3月29日周日 > > > > > > > 下午8:06写道: > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, Hi Till, > > > > > > > > > > > > > > > > > > > > > > > > thanks you for working on this topic. I believe it > > will make > > > > > > > > > debugging > > > > > > > > > > > > large Apache Flink deployments much more feasible. > > > > > > > > > > > > > > > > > > > > > > > > I was wondering whether it would make sense to allow > > the user > > > > > > to > > > > > > > > > specify > > > > > > > > > > > > the Resource ID in standalone setups? For example, > > many users > > > > > > > still > > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the > > native > > > > > > > support > > > > > > > > > is > > > > > > > > > > > > still experimental) and in these cases it would be > > interesting > > > > > > to > > > > > > > > > also > > > > > > > > > > > set > > > > > > > > > > > > the PodName as the ResourceID. What do you think? > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Kosntantin > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > > > > > > > [hidden email]> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Yangze, > > > > > > > > > > > > > > > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very > > good > > > > > > > > > improvement > > > > > > > > > > > > > helping our users and ourselves understanding > better > > what's > > > > > > > going > > > > > > > > > on in > > > > > > > > > > > > > Flink. > > > > > > > > > > > > > > > > > > > > > > > > > > Creating the ResourceIDs with host information/pod > > name is a > > > > > > > good > > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their > superset > > ID is a > > > > > > > good > > > > > > > > > idea. > > > > > > > > > > > > > > > > > > > > > > > > > > The InstanceID is used for fencing purposes. I > would > > not make > > > > > > > it a > > > > > > > > > > > > > composition of the ResourceID + a monotonically > > increasing > > > > > > > number. > > > > > > > > > The > > > > > > > > > > > > > problem is that in case of a RM failure the > > InstanceIDs would > > > > > > > start > > > > > > > > > > > from > > > > > > > > > > > > 0 > > > > > > > > > > > > > again and this could lead to collisions. > > > > > > > > > > > > > > > > > > > > > > > > > > Logging more information on how the different > > runtime IDs are > > > > > > > > > > > correlated > > > > > > > > > > > > is > > > > > > > > > > > > > also a good idea. > > > > > > > > > > > > > > > > > > > > > > > > > > Two other ideas for simplifying the ids are the > > following: > > > > > > > > > > > > > > > > > > > > > > > > > > * The SlotRequestID was introduced because the > > SlotPool was a > > > > > > > > > separate > > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being > > the case I > > > > > > > > > think we > > > > > > > > > > > > > could remove the SlotRequestID and replace it with > > the > > > > > > > > > AllocationID. > > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi > > task slots > > > > > > > one > > > > > > > > > could > > > > > > > > > > > > > derive them from the SlotRequestID used for > > requesting the > > > > > > > > > underlying > > > > > > > > > > > > > AllocatedSlot. > > > > > > > > > > > > > > > > > > > > > > > > > > Given that the slot sharing logic will most likely > be > > > > > > reworked > > > > > > > > > with the > > > > > > > > > > > > > pipelined region scheduling, we might be able to > > resolve > > > > > > these > > > > > > > two > > > > > > > > > > > points > > > > > > > > > > > > > as part of the pipelined region scheduling effort. > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > > > > > > > [hidden email]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would like to start a discussion thread on > > "FLIP-118: > > > > > > > Improve > > > > > > > > > > > > > > Flink’s ID system"[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > This FLIP mainly discusses the following issues, > > target to > > > > > > > > > enhance > > > > > > > > > > > the > > > > > > > > > > > > > > readability of IDs in log and help user to debug > > in case of > > > > > > > > > failures: > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Enhance the readability of the string literals > > of IDs. > > > > > > > Most of > > > > > > > > > them > > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do > > not > > > > > > provide > > > > > > > much > > > > > > > > > > > > > > meaningful information and are hard to recognize > > and > > > > > > compare > > > > > > > for > > > > > > > > > > > > > > users. > > > > > > > > > > > > > > - Log the ID’s lineage information to make > > debugging more > > > > > > > > > convenient. > > > > > > > > > > > > > > Currently, the log fails to always show the > lineage > > > > > > > information > > > > > > > > > > > > > > between IDs. Finding out relationships between > > entities > > > > > > > > > identified by > > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > > > > > > > AllocationID is > > > > > > > > > > > > > > assigned to satisfy slot request of with > > SlotRequestID. > > > > > > > Absence > > > > > > > > > of > > > > > > > > > > > > > > such lineage information, it’s impossible to > track > > the end > > > > > > > to end > > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which > > makes > > > > > > > debugging > > > > > > > > > > > > > > difficult. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Add location information to distributed > > components > > > > > > > > > > > > > > - Add topology information to graph components > > > > > > > > > > > > > > - Log the ID’s lineage information > > > > > > > > > > > > > > - Expose the identifier of distributing component > > to user > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki > document > > [1]. > > > > > > > Looking > > > > > > > > > > > forward > > > > > > > > > > > > > to > > > > > > > > > > > > > > your feedbacks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Konstantin Knauf | Head of Product > > > > > > > > > > > > > > > > > > > > > > > > +49 160 91394525 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData Ververica < > > https://www.ververica.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - > The > > Apache > > > > > > > Flink > > > > > > > > > > > > Conference > > > > > > > > > > > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > > Germany > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH > > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 > B > > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip > > Park Tung > > > > > > > Jason, > > > > > > > > > Ji > > > > > > > > > > > > (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Thanks for the feedback, @ZhuZhu and @Till
Thanks for the deeper analysis of the size of TDD, @ZhuZhu. If there is no further concern in the next 24 hours, I'll start voting for this FLIP. Best, Yangze Guo On Wed, Apr 15, 2020 at 4:56 PM Till Rohrmann <[hidden email]> wrote: > > This is good news Yangze. Decreasing the size of our id's is a really nice side effect :-) > > Hence, +1 from my side as well. > > Cheers, > Till > > On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <[hidden email]> wrote: >> >> Thanks for doing this benchmark @Yangze Guo <[hidden email]> . >> The result looks promising. So I would +1 to refactor ExecutionAttemptID >> and IntermediateResultPartitionID. >> >> Regarding why 'The size of TDD after serialization become smaller than >> before', I guess it's because the new IntermediateResultPartitionIDs can >> share the same IntermediateDataSetID, in this way the space of >> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an >> int (index), which is smaller than 2 Longs (AbstractID). >> >> Thanks, >> Zhu Zhu >> >> Yangze Guo <[hidden email]> 于2020年4月14日周二 下午3:09写道: >> >> > Hi everyone, >> > >> > I've investigated the infect with higher parallelism jobs. >> > >> > The result shows: >> > - The size of TDD after serialization become smaller than before. >> > While I did not meet any issue with Akka framework when the >> > parallelism set to 6000. >> > - There was no significant difference regarding the end to end >> > schedule time, job runtime, young gc count and total full gc time. >> > >> > For details, please take a look at >> > >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing >> > . >> > >> > From my perspective, I think it might be ok to refactor >> > ExecutionAttemptID and IntermediateResultPartitionID. If you have >> > further concerns or think we should make further investigation. Please >> > let me know. >> > >> > Best, >> > Yangze Guo >> > >> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <[hidden email]> wrote: >> > > >> > > Hi everyone, >> > > After an offline discussion with ZhuZhu, I have some comments on this >> > > investigation. >> > > >> > > Regarding the maximum parallelism went from 760 to 685, it may because >> > > of that the tasks are not scheduled evenly. The result is inconsistent >> > > in multiple experiments. So, this phenomenon would be irrelevant to >> > > our changes. >> > > >> > > I think what we really care about is the framesize for Akka(Should >> > > user enlarge it after our change for the same job). The size of TDD >> > > after serialization seems to be smaller after change. I don't know the >> > > root reason of this phenomenon at the moment. The way I measure it is: >> > > ``` >> > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); >> > > ObjectOutputStream oos = new ObjectOutputStream(bos); >> > > oos.writeObject(deployment); >> > > oos.flush(); >> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); >> > > ``` >> > > Please correct me if I'm wrong. >> > > >> > > I'll experiment with higher parallelism to see if there is any >> > > regression regarding Akka framesize. >> > > >> > > Regarding the TDD building time, the parallelism in my investigation >> > > might be too small. Meanwhile, this time might be influence by many >> > > factors. Thus, I'll >> > > - experiment with higher parallelism. >> > > - measure the time spent from "Starting scheduling" to the last task >> > > change state to running. >> > > >> > > Best, >> > > Yangze Guo >> > > >> > > >> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> wrote: >> > > > >> > > > Hi there, >> > > > >> > > > Sorry for the belated reply. I just make a preliminary investigation >> > > > of the infect of refactoring IntermediateResultPartitionID. >> > > > >> > > > The key change is making it being composed of IntermediateDataSetID >> > > > and a partitionNum. >> > > > public class IntermediateResultPartitionID { >> > > > private final IntermediateDataSetID intermediateDataSetID; >> > > > private final int partitionNum; >> > > > } >> > > > >> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test >> > > > job and run Flink on Yarn. All the configurations are kept default >> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. >> > > > >> > > > The result shows it does have an impact on performance. >> > > > - After this change, the maximum parallelism went from 760 to 685, >> > > > which limited by the total number of network buffers. For the same >> > > > parallelism, user needs more network buffer than before. >> > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase >> > > > by 4 bytes. For "PartitionRequest", it means 7% increase. >> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes >> > > > twice as long to construct TDD than before. >> > > > >> > > > Details record in >> > > > >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing >> > > > >> > > > The same issue could happen in ExecutionAttemptID, which will increase >> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex >> > > > and attemptNumber). But it may not infect the TDD as much as >> > > > IntermediateResultPartitionID, since there is only one >> > > > ExecutionAttemptID per TDD. >> > > > >> > > > After that preliminary investigation, I tend to not refactor >> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or >> > > > treat it as future work. >> > > > >> > > > WDYT? @ZhuZhu @Till >> > > > >> > > > Best, >> > > > Yangze Guo >> > > > >> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: >> > > > > >> > > > > >> However, it seems the JobVertexID is derived from hashcode ... >> > > > > You are right. JobVertexID is widely used and reworking it may >> > affect the >> > > > > public >> > > > > interfaces, e.g. REST api. We can take it as a long term goal and >> > exclude >> > > > > it from this FLIP. >> > > > > This same applies to IntermediateDataSetID, which can be also >> > composed of a >> > > > > JobID >> > > > > and an index as Till proposed. >> > > > > >> > > > > Thanks, >> > > > > Zhu Zhu >> > > > > >> > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: >> > > > > >> > > > > > For the IntermediateDataSetID I was just thinking that it might >> > actually be >> > > > > > interesting to know which job produced the result which, by using >> > cluster >> > > > > > partitions, could be used across different jobs. Not saying that >> > we have to >> > > > > > do it, though. >> > > > > > >> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the >> > problem with >> > > > > > too large TDDs there is already an issue [1]. The current >> > suspicion is that >> > > > > > the size of TDDs for jobs with a large parallelism can indeed >> > become >> > > > > > problematic for Flink. Hence, it would be great to investigate the >> > impacts >> > > > > > of the proposed changes. >> > > > > > >> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 >> > > > > > >> > > > > > Cheers, >> > > > > > Till >> > > > > > >> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> >> > wrote: >> > > > > > >> > > > > > > Hi, Zhu, >> > > > > > > >> > > > > > > Thanks for the feedback. >> > > > > > > >> > > > > > > > make JobVertexID a composition of JobID and a topology index >> > > > > > > I think it is a good idea. However, it seems the JobVertexID is >> > > > > > > derived from hashcode which used to identify them across >> > submission. >> > > > > > > I'm not familiar with that component though. I prefer to keep >> > this >> > > > > > > idea out the scope of this FLIP if no one could help us to >> > figure it >> > > > > > > out. >> > > > > > > >> > > > > > > > How about we still keep IntermediateDataSetID independent from >> > > > > > > JobVertexID, >> > > > > > > > but just print the producing relationships in logs? I think >> > keeping >> > > > > > > > IntermediateDataSetID independent may be better considering >> > the cross >> > > > > > job >> > > > > > > > result usages in interactive query cases. >> > > > > > > I think you are right. I'll keep IntermediateDataSetID >> > independent >> > > > > > > from JobVertexID. >> > > > > > > >> > > > > > > > The new IDs will become larger with this rework. >> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll >> > try to >> > > > > > > provide one during the implementation phase. >> > > > > > > >> > > > > > > >> > > > > > > Best, >> > > > > > > Yangze Guo >> > > > > > > >> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> >> > wrote: >> > > > > > > > >> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the >> > overall >> > > > > > > > proposal. It can help a lot in troubleshooting. >> > > > > > > > >> > > > > > > > Here are a few questions for it: >> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a >> > topology >> > > > > > index? >> > > > > > > > This would help in the session cluster case, so that we can >> > identify >> > > > > > > which >> > > > > > > > tasks are from which jobs along with the rework of >> > ExecutionAttemptID. >> > > > > > > > >> > > > > > > > 2. You mentioned that "Add the producer info to the string >> > literal of >> > > > > > > > IntermediateDataSetID". Do you mean to make >> > IntermediateDataSetID a >> > > > > > > > composition of JobVertexID and a consumer index? >> > > > > > > > How about we still keep IntermediateDataSetID independent from >> > > > > > > JobVertexID, >> > > > > > > > but just print the producing relationships in logs? I think >> > keeping >> > > > > > > > IntermediateDataSetID independent may be better considering >> > the cross >> > > > > > job >> > > > > > > > result usages in interactive query cases. >> > > > > > > > >> > > > > > > > 3. The new IDs will become larger with this rework. The >> > > > > > > > TaskDeploymentDescriptor can become much larger since it is >> > mainly >> > > > > > > composed >> > > > > > > > of a variety DIs. I'm not sure how much it would be but there >> > can be >> > > > > > more >> > > > > > > > memory and CPU cost for it, and results in more frequent GCs, >> > message >> > > > > > > size >> > > > > > > > exceeding akka frame limits, and a longer blocked time of main >> > thread. >> > > > > > > > This should not be a problem in most cases but might be a >> > problem for >> > > > > > > large >> > > > > > > > scale jobs. Shall we have an benchmark for it? >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Zhu Zhu >> > > > > > > > >> > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: >> > > > > > > > >> > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. >> > > > > > > > > >> > > > > > > > > @Till >> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out >> > of the >> > > > > > > > > scope of this FLIP since the pipelined region scheduling is >> > an >> > > > > > ongoing >> > > > > > > > > work now. >> > > > > > > > > I also agree that we should not make the InstanceID in >> > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus >> > a >> > > > > > > > > monotonically increasing value. Thanks a lot for your >> > explanation. >> > > > > > > > > >> > > > > > > > > @Konstantin @Yang >> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's >> > > > > > > > > suggestion. It makes sense to me to let user export >> > RESOURCE_ID and >> > > > > > > > > make TM respect it. User needs to guarantee there is no >> > collision for >> > > > > > > > > different TM. >> > > > > > > > > >> > > > > > > > > Best, >> > > > > > > > > Yangze Guo >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < >> > [hidden email]> >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager >> > > > > > > > > > >> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < >> > [hidden email]> >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi Konstantin, >> > > > > > > > > > > >> > > > > > > > > > > I think it is a good idea. Currently, our users also >> > report a >> > > > > > > similar >> > > > > > > > > issue >> > > > > > > > > > > with >> > > > > > > > > > > resourceId of standalone cluster. When we start a >> > standalone >> > > > > > > cluster >> > > > > > > > > now, >> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the >> > > > > > > resourceId. It >> > > > > > > > > will >> > > > > > > > > > > be used to register to the jobmanager and not convenient >> > to match >> > > > > > > with >> > > > > > > > > the >> > > > > > > > > > > real >> > > > > > > > > > > taskmanager, especially in container environment. >> > > > > > > > > > > >> > > > > > > > > > > I think a probably solution is we could support the user >> > defined >> > > > > > > > > > > resourceId. >> > > > > > > > > > > We could get it from the environment. For standalone on >> > K8s, we >> > > > > > > could >> > > > > > > > > set >> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is >> > easier to >> > > > > > > match the >> > > > > > > > > > > taskmanager with K8s pod. >> > > > > > > > > > > >> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to >> > the >> > > > > > > resourceId. >> > > > > > > > > I >> > > > > > > > > > > think >> > > > > > > > > > > you could set the "deployment.meta.name". Since the pod >> > name is >> > > > > > > > > generated >> > > > > > > > > > > by >> > > > > > > > > > > K8s in the pattern >> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On >> > > > > > the >> > > > > > > > > > > contrary, we >> > > > > > > > > > > will set the resourceId to the pod name. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Yang >> > > > > > > > > > > >> > > > > > > > > > > Konstantin Knauf <[hidden email]> >> > 于2020年3月29日周日 >> > > > > > > 下午8:06写道: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Yangze, Hi Till, >> > > > > > > > > > > > >> > > > > > > > > > > > thanks you for working on this topic. I believe it >> > will make >> > > > > > > > > debugging >> > > > > > > > > > > > large Apache Flink deployments much more feasible. >> > > > > > > > > > > > >> > > > > > > > > > > > I was wondering whether it would make sense to allow >> > the user >> > > > > > to >> > > > > > > > > specify >> > > > > > > > > > > > the Resource ID in standalone setups? For example, >> > many users >> > > > > > > still >> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the >> > native >> > > > > > > support >> > > > > > > > > is >> > > > > > > > > > > > still experimental) and in these cases it would be >> > interesting >> > > > > > to >> > > > > > > > > also >> > > > > > > > > > > set >> > > > > > > > > > > > the PodName as the ResourceID. What do you think? >> > > > > > > > > > > > >> > > > > > > > > > > > Cheers, >> > > > > > > > > > > > >> > > > > > > > > > > > Kosntantin >> > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < >> > > > > > > [hidden email]> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi Yangze, >> > > > > > > > > > > > > >> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very >> > good >> > > > > > > > > improvement >> > > > > > > > > > > > > helping our users and ourselves understanding better >> > what's >> > > > > > > going >> > > > > > > > > on in >> > > > > > > > > > > > > Flink. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod >> > name is a >> > > > > > > good >> > > > > > > > > idea. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset >> > ID is a >> > > > > > > good >> > > > > > > > > idea. >> > > > > > > > > > > > > >> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would >> > not make >> > > > > > > it a >> > > > > > > > > > > > > composition of the ResourceID + a monotonically >> > increasing >> > > > > > > number. >> > > > > > > > > The >> > > > > > > > > > > > > problem is that in case of a RM failure the >> > InstanceIDs would >> > > > > > > start >> > > > > > > > > > > from >> > > > > > > > > > > > 0 >> > > > > > > > > > > > > again and this could lead to collisions. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Logging more information on how the different >> > runtime IDs are >> > > > > > > > > > > correlated >> > > > > > > > > > > > is >> > > > > > > > > > > > > also a good idea. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Two other ideas for simplifying the ids are the >> > following: >> > > > > > > > > > > > > >> > > > > > > > > > > > > * The SlotRequestID was introduced because the >> > SlotPool was a >> > > > > > > > > separate >> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being >> > the case I >> > > > > > > > > think we >> > > > > > > > > > > > > could remove the SlotRequestID and replace it with >> > the >> > > > > > > > > AllocationID. >> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi >> > task slots >> > > > > > > one >> > > > > > > > > could >> > > > > > > > > > > > > derive them from the SlotRequestID used for >> > requesting the >> > > > > > > > > underlying >> > > > > > > > > > > > > AllocatedSlot. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Given that the slot sharing logic will most likely be >> > > > > > reworked >> > > > > > > > > with the >> > > > > > > > > > > > > pipelined region scheduling, we might be able to >> > resolve >> > > > > > these >> > > > > > > two >> > > > > > > > > > > points >> > > > > > > > > > > > > as part of the pipelined region scheduling effort. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Cheers, >> > > > > > > > > > > > > Till >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < >> > > > > > > [hidden email]> >> > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > We would like to start a discussion thread on >> > "FLIP-118: >> > > > > > > Improve >> > > > > > > > > > > > > > Flink’s ID system"[1]. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > This FLIP mainly discusses the following issues, >> > target to >> > > > > > > > > enhance >> > > > > > > > > > > the >> > > > > > > > > > > > > > readability of IDs in log and help user to debug >> > in case of >> > > > > > > > > failures: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Enhance the readability of the string literals >> > of IDs. >> > > > > > > Most of >> > > > > > > > > them >> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do >> > not >> > > > > > provide >> > > > > > > much >> > > > > > > > > > > > > > meaningful information and are hard to recognize >> > and >> > > > > > compare >> > > > > > > for >> > > > > > > > > > > > > > users. >> > > > > > > > > > > > > > - Log the ID’s lineage information to make >> > debugging more >> > > > > > > > > convenient. >> > > > > > > > > > > > > > Currently, the log fails to always show the lineage >> > > > > > > information >> > > > > > > > > > > > > > between IDs. Finding out relationships between >> > entities >> > > > > > > > > identified by >> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which >> > > > > > > AllocationID is >> > > > > > > > > > > > > > assigned to satisfy slot request of with >> > SlotRequestID. >> > > > > > > Absence >> > > > > > > > > of >> > > > > > > > > > > > > > such lineage information, it’s impossible to track >> > the end >> > > > > > > to end >> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which >> > makes >> > > > > > > debugging >> > > > > > > > > > > > > > difficult. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Add location information to distributed >> > components >> > > > > > > > > > > > > > - Add topology information to graph components >> > > > > > > > > > > > > > - Log the ID’s lineage information >> > > > > > > > > > > > > > - Expose the identifier of distributing component >> > to user >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Please find more details in the FLIP wiki document >> > [1]. >> > > > > > > Looking >> > > > > > > > > > > forward >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > your feedbacks. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > [1] >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > Yangze Guo >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > >> > > > > > > > > > > > Konstantin Knauf | Head of Product >> > > > > > > > > > > > >> > > > > > > > > > > > +49 160 91394525 >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Follow us @VervericaData Ververica < >> > https://www.ververica.com/ >> > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > >> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The >> > Apache >> > > > > > > Flink >> > > > > > > > > > > > Conference >> > > > > > > > > > > > >> > > > > > > > > > > > Stream Processing | Event Driven | Real Time >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > >> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, >> > Germany >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > Ververica GmbH >> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B >> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip >> > Park Tung >> > > > > > > Jason, >> > > > > > > > > Ji >> > > > > > > > > > > > (Tony) Cheng >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > >> > |
To keep the VOTE thread clean, I just forward @Zhijiang 's reminder here:
> Sorry for not involving in the previous discussion. In general I like the proposed direction to make related IDs have more rich information for debugging and correlation. > But I have a bit reminder for the changes. After failover restarting, every related IDs would be generated differently with before by random way, and I am not sure whether this was intentional design before, but it is the ground truth now. > E.g. ExecutionAttemptID , and the ResultPartitionID is also derived from it to guarantee the uniqueness after failover. Based on current implementation, it seems that we will not store and rely on the previous ID states after failover, but I am not sure whether this assumption is valid for future features. > Anyway, it is lucky that the proposed changes in this FLIP do not break the previous truth by introducing the `attemptNumber` in `ExecutionAttemptID`, so we do not need to further consider this issue now. Thanks for the reminder, I agreed that it should not break the failover related rule. If some contracts in the future need different IDs across failover, we need to rethink the structure of those IDs. Best, Yangze Guo On Wed, Apr 15, 2020 at 5:09 PM Yangze Guo <[hidden email]> wrote: > > Thanks for the feedback, @ZhuZhu and @Till > Thanks for the deeper analysis of the size of TDD, @ZhuZhu. > > If there is no further concern in the next 24 hours, I'll start voting > for this FLIP. > > Best, > Yangze Guo > > On Wed, Apr 15, 2020 at 4:56 PM Till Rohrmann <[hidden email]> wrote: > > > > This is good news Yangze. Decreasing the size of our id's is a really nice side effect :-) > > > > Hence, +1 from my side as well. > > > > Cheers, > > Till > > > > On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <[hidden email]> wrote: > >> > >> Thanks for doing this benchmark @Yangze Guo <[hidden email]> . > >> The result looks promising. So I would +1 to refactor ExecutionAttemptID > >> and IntermediateResultPartitionID. > >> > >> Regarding why 'The size of TDD after serialization become smaller than > >> before', I guess it's because the new IntermediateResultPartitionIDs can > >> share the same IntermediateDataSetID, in this way the space of > >> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an > >> int (index), which is smaller than 2 Longs (AbstractID). > >> > >> Thanks, > >> Zhu Zhu > >> > >> Yangze Guo <[hidden email]> 于2020年4月14日周二 下午3:09写道: > >> > >> > Hi everyone, > >> > > >> > I've investigated the infect with higher parallelism jobs. > >> > > >> > The result shows: > >> > - The size of TDD after serialization become smaller than before. > >> > While I did not meet any issue with Akka framework when the > >> > parallelism set to 6000. > >> > - There was no significant difference regarding the end to end > >> > schedule time, job runtime, young gc count and total full gc time. > >> > > >> > For details, please take a look at > >> > > >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > >> > . > >> > > >> > From my perspective, I think it might be ok to refactor > >> > ExecutionAttemptID and IntermediateResultPartitionID. If you have > >> > further concerns or think we should make further investigation. Please > >> > let me know. > >> > > >> > Best, > >> > Yangze Guo > >> > > >> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <[hidden email]> wrote: > >> > > > >> > > Hi everyone, > >> > > After an offline discussion with ZhuZhu, I have some comments on this > >> > > investigation. > >> > > > >> > > Regarding the maximum parallelism went from 760 to 685, it may because > >> > > of that the tasks are not scheduled evenly. The result is inconsistent > >> > > in multiple experiments. So, this phenomenon would be irrelevant to > >> > > our changes. > >> > > > >> > > I think what we really care about is the framesize for Akka(Should > >> > > user enlarge it after our change for the same job). The size of TDD > >> > > after serialization seems to be smaller after change. I don't know the > >> > > root reason of this phenomenon at the moment. The way I measure it is: > >> > > ``` > >> > > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > >> > > ObjectOutputStream oos = new ObjectOutputStream(bos); > >> > > oos.writeObject(deployment); > >> > > oos.flush(); > >> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length); > >> > > ``` > >> > > Please correct me if I'm wrong. > >> > > > >> > > I'll experiment with higher parallelism to see if there is any > >> > > regression regarding Akka framesize. > >> > > > >> > > Regarding the TDD building time, the parallelism in my investigation > >> > > might be too small. Meanwhile, this time might be influence by many > >> > > factors. Thus, I'll > >> > > - experiment with higher parallelism. > >> > > - measure the time spent from "Starting scheduling" to the last task > >> > > change state to running. > >> > > > >> > > Best, > >> > > Yangze Guo > >> > > > >> > > > >> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <[hidden email]> wrote: > >> > > > > >> > > > Hi there, > >> > > > > >> > > > Sorry for the belated reply. I just make a preliminary investigation > >> > > > of the infect of refactoring IntermediateResultPartitionID. > >> > > > > >> > > > The key change is making it being composed of IntermediateDataSetID > >> > > > and a partitionNum. > >> > > > public class IntermediateResultPartitionID { > >> > > > private final IntermediateDataSetID intermediateDataSetID; > >> > > > private final int partitionNum; > >> > > > } > >> > > > > >> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test > >> > > > job and run Flink on Yarn. All the configurations are kept default > >> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2. > >> > > > > >> > > > The result shows it does have an impact on performance. > >> > > > - After this change, the maximum parallelism went from 760 to 685, > >> > > > which limited by the total number of network buffers. For the same > >> > > > parallelism, user needs more network buffer than before. > >> > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase > >> > > > by 4 bytes. For "PartitionRequest", it means 7% increase. > >> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes > >> > > > twice as long to construct TDD than before. > >> > > > > >> > > > Details record in > >> > > > > >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing > >> > > > > >> > > > The same issue could happen in ExecutionAttemptID, which will increase > >> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex > >> > > > and attemptNumber). But it may not infect the TDD as much as > >> > > > IntermediateResultPartitionID, since there is only one > >> > > > ExecutionAttemptID per TDD. > >> > > > > >> > > > After that preliminary investigation, I tend to not refactor > >> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or > >> > > > treat it as future work. > >> > > > > >> > > > WDYT? @ZhuZhu @Till > >> > > > > >> > > > Best, > >> > > > Yangze Guo > >> > > > > >> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote: > >> > > > > > >> > > > > >> However, it seems the JobVertexID is derived from hashcode ... > >> > > > > You are right. JobVertexID is widely used and reworking it may > >> > affect the > >> > > > > public > >> > > > > interfaces, e.g. REST api. We can take it as a long term goal and > >> > exclude > >> > > > > it from this FLIP. > >> > > > > This same applies to IntermediateDataSetID, which can be also > >> > composed of a > >> > > > > JobID > >> > > > > and an index as Till proposed. > >> > > > > > >> > > > > Thanks, > >> > > > > Zhu Zhu > >> > > > > > >> > > > > Till Rohrmann <[hidden email]> 于2020年3月31日周二 下午8:36写道: > >> > > > > > >> > > > > > For the IntermediateDataSetID I was just thinking that it might > >> > actually be > >> > > > > > interesting to know which job produced the result which, by using > >> > cluster > >> > > > > > partitions, could be used across different jobs. Not saying that > >> > we have to > >> > > > > > do it, though. > >> > > > > > > >> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the > >> > problem with > >> > > > > > too large TDDs there is already an issue [1]. The current > >> > suspicion is that > >> > > > > > the size of TDDs for jobs with a large parallelism can indeed > >> > become > >> > > > > > problematic for Flink. Hence, it would be great to investigate the > >> > impacts > >> > > > > > of the proposed changes. > >> > > > > > > >> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069 > >> > > > > > > >> > > > > > Cheers, > >> > > > > > Till > >> > > > > > > >> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <[hidden email]> > >> > wrote: > >> > > > > > > >> > > > > > > Hi, Zhu, > >> > > > > > > > >> > > > > > > Thanks for the feedback. > >> > > > > > > > >> > > > > > > > make JobVertexID a composition of JobID and a topology index > >> > > > > > > I think it is a good idea. However, it seems the JobVertexID is > >> > > > > > > derived from hashcode which used to identify them across > >> > submission. > >> > > > > > > I'm not familiar with that component though. I prefer to keep > >> > this > >> > > > > > > idea out the scope of this FLIP if no one could help us to > >> > figure it > >> > > > > > > out. > >> > > > > > > > >> > > > > > > > How about we still keep IntermediateDataSetID independent from > >> > > > > > > JobVertexID, > >> > > > > > > > but just print the producing relationships in logs? I think > >> > keeping > >> > > > > > > > IntermediateDataSetID independent may be better considering > >> > the cross > >> > > > > > job > >> > > > > > > > result usages in interactive query cases. > >> > > > > > > I think you are right. I'll keep IntermediateDataSetID > >> > independent > >> > > > > > > from JobVertexID. > >> > > > > > > > >> > > > > > > > The new IDs will become larger with this rework. > >> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll > >> > try to > >> > > > > > > provide one during the implementation phase. > >> > > > > > > > >> > > > > > > > >> > > > > > > Best, > >> > > > > > > Yangze Guo > >> > > > > > > > >> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <[hidden email]> > >> > wrote: > >> > > > > > > > > >> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the > >> > overall > >> > > > > > > > proposal. It can help a lot in troubleshooting. > >> > > > > > > > > >> > > > > > > > Here are a few questions for it: > >> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a > >> > topology > >> > > > > > index? > >> > > > > > > > This would help in the session cluster case, so that we can > >> > identify > >> > > > > > > which > >> > > > > > > > tasks are from which jobs along with the rework of > >> > ExecutionAttemptID. > >> > > > > > > > > >> > > > > > > > 2. You mentioned that "Add the producer info to the string > >> > literal of > >> > > > > > > > IntermediateDataSetID". Do you mean to make > >> > IntermediateDataSetID a > >> > > > > > > > composition of JobVertexID and a consumer index? > >> > > > > > > > How about we still keep IntermediateDataSetID independent from > >> > > > > > > JobVertexID, > >> > > > > > > > but just print the producing relationships in logs? I think > >> > keeping > >> > > > > > > > IntermediateDataSetID independent may be better considering > >> > the cross > >> > > > > > job > >> > > > > > > > result usages in interactive query cases. > >> > > > > > > > > >> > > > > > > > 3. The new IDs will become larger with this rework. The > >> > > > > > > > TaskDeploymentDescriptor can become much larger since it is > >> > mainly > >> > > > > > > composed > >> > > > > > > > of a variety DIs. I'm not sure how much it would be but there > >> > can be > >> > > > > > more > >> > > > > > > > memory and CPU cost for it, and results in more frequent GCs, > >> > message > >> > > > > > > size > >> > > > > > > > exceeding akka frame limits, and a longer blocked time of main > >> > thread. > >> > > > > > > > This should not be a problem in most cases but might be a > >> > problem for > >> > > > > > > large > >> > > > > > > > scale jobs. Shall we have an benchmark for it? > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Zhu Zhu > >> > > > > > > > > >> > > > > > > > Yangze Guo <[hidden email]> 于2020年3月31日周二 下午2:19写道: > >> > > > > > > > > >> > > > > > > > > Thank you all for the feedback! Sorry for the belated reply. > >> > > > > > > > > > >> > > > > > > > > @Till > >> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out > >> > of the > >> > > > > > > > > scope of this FLIP since the pipelined region scheduling is > >> > an > >> > > > > > ongoing > >> > > > > > > > > work now. > >> > > > > > > > > I also agree that we should not make the InstanceID in > >> > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus > >> > a > >> > > > > > > > > monotonically increasing value. Thanks a lot for your > >> > explanation. > >> > > > > > > > > > >> > > > > > > > > @Konstantin @Yang > >> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's > >> > > > > > > > > suggestion. It makes sense to me to let user export > >> > RESOURCE_ID and > >> > > > > > > > > make TM respect it. User needs to guarantee there is no > >> > collision for > >> > > > > > > > > different TM. > >> > > > > > > > > > >> > > > > > > > > Best, > >> > > > > > > > > Yangze Guo > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu < > >> > [hidden email]> > >> > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager > >> > > > > > > > > > > >> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang < > >> > [hidden email]> > >> > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi Konstantin, > >> > > > > > > > > > > > >> > > > > > > > > > > I think it is a good idea. Currently, our users also > >> > report a > >> > > > > > > similar > >> > > > > > > > > issue > >> > > > > > > > > > > with > >> > > > > > > > > > > resourceId of standalone cluster. When we start a > >> > standalone > >> > > > > > > cluster > >> > > > > > > > > now, > >> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the > >> > > > > > > resourceId. It > >> > > > > > > > > will > >> > > > > > > > > > > be used to register to the jobmanager and not convenient > >> > to match > >> > > > > > > with > >> > > > > > > > > the > >> > > > > > > > > > > real > >> > > > > > > > > > > taskmanager, especially in container environment. > >> > > > > > > > > > > > >> > > > > > > > > > > I think a probably solution is we could support the user > >> > defined > >> > > > > > > > > > > resourceId. > >> > > > > > > > > > > We could get it from the environment. For standalone on > >> > K8s, we > >> > > > > > > could > >> > > > > > > > > set > >> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is > >> > easier to > >> > > > > > > match the > >> > > > > > > > > > > taskmanager with K8s pod. > >> > > > > > > > > > > > >> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to > >> > the > >> > > > > > > resourceId. > >> > > > > > > > > I > >> > > > > > > > > > > think > >> > > > > > > > > > > you could set the "deployment.meta.name". Since the pod > >> > name is > >> > > > > > > > > generated > >> > > > > > > > > > > by > >> > > > > > > > > > > K8s in the pattern > >> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On > >> > > > > > the > >> > > > > > > > > > > contrary, we > >> > > > > > > > > > > will set the resourceId to the pod name. > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > Best, > >> > > > > > > > > > > Yang > >> > > > > > > > > > > > >> > > > > > > > > > > Konstantin Knauf <[hidden email]> > >> > 于2020年3月29日周日 > >> > > > > > > 下午8:06写道: > >> > > > > > > > > > > > >> > > > > > > > > > > > Hi Yangze, Hi Till, > >> > > > > > > > > > > > > >> > > > > > > > > > > > thanks you for working on this topic. I believe it > >> > will make > >> > > > > > > > > debugging > >> > > > > > > > > > > > large Apache Flink deployments much more feasible. > >> > > > > > > > > > > > > >> > > > > > > > > > > > I was wondering whether it would make sense to allow > >> > the user > >> > > > > > to > >> > > > > > > > > specify > >> > > > > > > > > > > > the Resource ID in standalone setups? For example, > >> > many users > >> > > > > > > still > >> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the > >> > native > >> > > > > > > support > >> > > > > > > > > is > >> > > > > > > > > > > > still experimental) and in these cases it would be > >> > interesting > >> > > > > > to > >> > > > > > > > > also > >> > > > > > > > > > > set > >> > > > > > > > > > > > the PodName as the ResourceID. What do you think? > >> > > > > > > > > > > > > >> > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Kosntantin > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann < > >> > > > > > > [hidden email]> > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi Yangze, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very > >> > good > >> > > > > > > > > improvement > >> > > > > > > > > > > > > helping our users and ourselves understanding better > >> > what's > >> > > > > > > going > >> > > > > > > > > on in > >> > > > > > > > > > > > > Flink. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod > >> > name is a > >> > > > > > > good > >> > > > > > > > > idea. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset > >> > ID is a > >> > > > > > > good > >> > > > > > > > > idea. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would > >> > not make > >> > > > > > > it a > >> > > > > > > > > > > > > composition of the ResourceID + a monotonically > >> > increasing > >> > > > > > > number. > >> > > > > > > > > The > >> > > > > > > > > > > > > problem is that in case of a RM failure the > >> > InstanceIDs would > >> > > > > > > start > >> > > > > > > > > > > from > >> > > > > > > > > > > > 0 > >> > > > > > > > > > > > > again and this could lead to collisions. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Logging more information on how the different > >> > runtime IDs are > >> > > > > > > > > > > correlated > >> > > > > > > > > > > > is > >> > > > > > > > > > > > > also a good idea. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Two other ideas for simplifying the ids are the > >> > following: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > * The SlotRequestID was introduced because the > >> > SlotPool was a > >> > > > > > > > > separate > >> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being > >> > the case I > >> > > > > > > > > think we > >> > > > > > > > > > > > > could remove the SlotRequestID and replace it with > >> > the > >> > > > > > > > > AllocationID. > >> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi > >> > task slots > >> > > > > > > one > >> > > > > > > > > could > >> > > > > > > > > > > > > derive them from the SlotRequestID used for > >> > requesting the > >> > > > > > > > > underlying > >> > > > > > > > > > > > > AllocatedSlot. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Given that the slot sharing logic will most likely be > >> > > > > > reworked > >> > > > > > > > > with the > >> > > > > > > > > > > > > pipelined region scheduling, we might be able to > >> > resolve > >> > > > > > these > >> > > > > > > two > >> > > > > > > > > > > points > >> > > > > > > > > > > > > as part of the pipelined region scheduling effort. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > Till > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo < > >> > > > > > > [hidden email]> > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi everyone, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > We would like to start a discussion thread on > >> > "FLIP-118: > >> > > > > > > Improve > >> > > > > > > > > > > > > > Flink’s ID system"[1]. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > This FLIP mainly discusses the following issues, > >> > target to > >> > > > > > > > > enhance > >> > > > > > > > > > > the > >> > > > > > > > > > > > > > readability of IDs in log and help user to debug > >> > in case of > >> > > > > > > > > failures: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Enhance the readability of the string literals > >> > of IDs. > >> > > > > > > Most of > >> > > > > > > > > them > >> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do > >> > not > >> > > > > > provide > >> > > > > > > much > >> > > > > > > > > > > > > > meaningful information and are hard to recognize > >> > and > >> > > > > > compare > >> > > > > > > for > >> > > > > > > > > > > > > > users. > >> > > > > > > > > > > > > > - Log the ID’s lineage information to make > >> > debugging more > >> > > > > > > > > convenient. > >> > > > > > > > > > > > > > Currently, the log fails to always show the lineage > >> > > > > > > information > >> > > > > > > > > > > > > > between IDs. Finding out relationships between > >> > entities > >> > > > > > > > > identified by > >> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which > >> > > > > > > AllocationID is > >> > > > > > > > > > > > > > assigned to satisfy slot request of with > >> > SlotRequestID. > >> > > > > > > Absence > >> > > > > > > > > of > >> > > > > > > > > > > > > > such lineage information, it’s impossible to track > >> > the end > >> > > > > > > to end > >> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which > >> > makes > >> > > > > > > debugging > >> > > > > > > > > > > > > > difficult. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > - Add location information to distributed > >> > components > >> > > > > > > > > > > > > > - Add topology information to graph components > >> > > > > > > > > > > > > > - Log the ID’s lineage information > >> > > > > > > > > > > > > > - Expose the identifier of distributing component > >> > to user > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Please find more details in the FLIP wiki document > >> > [1]. > >> > > > > > > Looking > >> > > > > > > > > > > forward > >> > > > > > > > > > > > > to > >> > > > > > > > > > > > > > your feedbacks. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > [1] > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521 > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Best, > >> > > > > > > > > > > > > > Yangze Guo > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > -- > >> > > > > > > > > > > > > >> > > > > > > > > > > > Konstantin Knauf | Head of Product > >> > > > > > > > > > > > > >> > > > > > > > > > > > +49 160 91394525 > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > Follow us @VervericaData Ververica < > >> > https://www.ververica.com/ > >> > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > -- > >> > > > > > > > > > > > > >> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The > >> > Apache > >> > > > > > > Flink > >> > > > > > > > > > > > Conference > >> > > > > > > > > > > > > >> > > > > > > > > > > > Stream Processing | Event Driven | Real Time > >> > > > > > > > > > > > > >> > > > > > > > > > > > -- > >> > > > > > > > > > > > > >> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, > >> > Germany > >> > > > > > > > > > > > > >> > > > > > > > > > > > -- > >> > > > > > > > > > > > Ververica GmbH > >> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip > >> > Park Tung > >> > > > > > > Jason, > >> > > > > > > > > Ji > >> > > > > > > > > > > > (Tony) Cheng > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > >> > |
Free forum by Nabble | Edit this page |