Hi,
I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. Best, Hwanju |
Hi Hwanju,
Thanks for starting the discussion. Definitely any improvement in this area would be very helpful and valuable. Generally speaking +1 from my side, as long as we make sure that either such changes do not add performance overhead (which I think they shouldn’t) or they are optional. > Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Couple of questions/remarks: 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking) What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. One thing that might be tricky is if error in Flink code is caused by user’s mistake. > Fourthly, stuck progress Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. Piotrek > On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> wrote: > > Hi, > > I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. > > For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. > > The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. > > Best, > Hwanju > |
On 16/05/2019 11:34, Piotr Nowojski wrote:
> Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. I fully agree, and believe we should split this thread. We will end up discussing too many issues at once. Nevertheless, On 16/05/2019 11:34, Piotr Nowojski wrote: > 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. I don't believe we do. The Task state is set to running on the TM once the Invokable has been instantiated, but at that point we aren't even on the Streaming API level and hence haven't loaded anything. AFAIK this is all done in StreamTask#invoke which is called afterwards. On 16/05/2019 11:34, Piotr Nowojski wrote: > 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. I have my doubts that there's anything we can/should do here. The job state works the way it does; I'd rather not change it now tih no much work on the scheduler going on, nor would I want metrics to report something that is no line with what is logged. |
In reply to this post by Piotr Nowojski-3
Hi Piotrek,
Thanks for insightful feedback and indeed you got most tricky parts and concerns. > 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. As Chesnay said, initializeState is called in StreamTask.invoke after transitioning to RUNNING. So, task state restore part is currently during RUNNING. I think accounting state restore as running seems fine, since state size is user's artifact, as long as we can detect service error during restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens at initializeState in StreamTask.invoke). We can discuss about the need to have separate state to track restore and running separately, but it seems to add too many messages in common paths just for tracking. > 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For others, when the first task transitions to SCHEDULED, SCHEDULING stage begins, and when the first task transitions to DEPLOYING, it starts DEPLOYING stage. This would be fine especially for eager scheduling and full-restart fail-over strategy. In the individual or partial restart, we may not need to specifically track SCHEDULING and DEPLOYING states while treating job as running relying on progress monitor. >2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. Exactly. I have roughly looked at batch side, but not in detail yet and am aware of ongoing scheduling work. Initial focus of breaking out to multiple states like scheduling/deploying would be only for streaming with eager scheduling. Need to give more thought how to deal with batching/lazy scheduling. > What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? Basically, initial cause is traced back from each component of downtime, which is accounted to a certain type like user or system based on the classification. So you're right. Interesting part here is about secondary failure. For example, a user error causes a job to restart but then scheduling is failed by system issue. We need to account failing, restarting time to user, while scheduling time on restart (e.g,. 5min timeout) is to system. A further example is that a system error causes a job to be failing, but one of the user function is not reacting to cancellation (for full-restart), prolonged failing time (e.g., watchdog timeout 3min) shouldn’t be accounted to system (of course, the other way around has been seen -- e.g., FLINK-5463). > Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. I think classifier here is complementary to exception type approach. In this context, classifier is "f(exception) -> type". Type is used as metric dimension to set alert on certain types or have downtime breakdown on each type (type is not just fixed to "user" or "system" but can be more specific and customizable like statebackend and network). If we do wrap exceptions perfectly as you said, f() is simple enough to look at Exception type and then return its corresponding type. Initially we also thought complete wrapping would be ideal. However, even inside UDF, it can call in Flink framework like state update or call out dependent services, which service provider may want to classify separately. In addition, Flink allows user to use lower level API like streamoperator to make the border a little blurring. Those would make complete wrapping challenging. Besides, stack-based classification beyond exception type could still be needed for stuck progress classification. Without instrumentation, one of base classifiers that work for our environment in many cases is user-class-loader classifier, which can detect if an exception is thrown from the class loaded from user JAR/artifact (although this may be less desirable in an environment where user's artifacts can be installed directly in system lib/, but service providers would be opting in self-contained jar submission keeping system environment for system-only). > One thing that might be tricky is if error in Flink code is caused by user’s mistake. Right, this is the trickiest part. Based on our analysis with real data, the most ambiguous ones are custom serialization and out-of-resource errors. The former is usually seen in Flink runtime code rather than in UDF. The latter is that Flink stack is just a victim by resource hog/leak of user code (OOM, too many open files). For the serialization issue, we've been looking at (and learning) various serialization errors seen in the field to get reasonable classification. For the out-of-resource, rather than user vs. system classification, we can tag the type as "resource" relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. > Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. Again you're right and like you said, this part would be mostly reusing the existing building blocks such as latency marker and backpressure samplings. If configured only with progress monitoring not latency distribution tracking, latency marker can be lightweight skipping histogram update part just updating latest timestamp with longer period not to adversely affect performance. Once stuck progress is detected, stack sampling can tell us more about the context that causes backpressure. > Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. Agreed. Once some level of initial discussion clears things out at least high level, I can start out more independent threads. Best, Hwanju On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: Hi Hwanju, Thanks for starting the discussion. Definitely any improvement in this area would be very helpful and valuable. Generally speaking +1 from my side, as long as we make sure that either such changes do not add performance overhead (which I think they shouldn’t) or they are optional. > Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Couple of questions/remarks: 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking) What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. One thing that might be tricky is if error in Flink code is caused by user’s mistake. > Fourthly, stuck progress Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. Piotrek > On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> wrote: > > Hi, > > I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. > > For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. > > The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. > > Best, > Hwanju > |
Hi Hwanju & Chesney,
Regarding various things that both of you mentioned, like accounting of state restoration separately or batch scheduling, we can always acknowledge some limitations of the initial approach and maybe we can address them later if we evaluate it worth the effort. Generally speaking all that you have written make sense to me, so +1 from my side to split the discussion into separate threads. Piotrek > On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> wrote: > > Hi Piotrek, > > Thanks for insightful feedback and indeed you got most tricky parts and concerns. > >> 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. > > As Chesnay said, initializeState is called in StreamTask.invoke after transitioning to RUNNING. So, task state restore part is currently during RUNNING. I think accounting state restore as running seems fine, since state size is user's artifact, as long as we can detect service error during restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens at initializeState in StreamTask.invoke). We can discuss about the need to have separate state to track restore and running separately, but it seems to add too many messages in common paths just for tracking. > >> 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For others, when the first task transitions to SCHEDULED, SCHEDULING stage begins, and when the first task transitions to DEPLOYING, it starts DEPLOYING stage. This would be fine especially for eager scheduling and full-restart fail-over strategy. In the individual or partial restart, we may not need to specifically track SCHEDULING and DEPLOYING states while treating job as running relying on progress monitor. > >> 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > > Exactly. I have roughly looked at batch side, but not in detail yet and am aware of ongoing scheduling work. Initial focus of breaking out to multiple states like scheduling/deploying would be only for streaming with eager scheduling. Need to give more thought how to deal with batching/lazy scheduling. > >> What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > > Basically, initial cause is traced back from each component of downtime, which is accounted to a certain type like user or system based on the classification. So you're right. Interesting part here is about secondary failure. For example, a user error causes a job to restart but then scheduling is failed by system issue. We need to account failing, restarting time to user, while scheduling time on restart (e.g,. 5min timeout) is to system. A further example is that a system error causes a job to be failing, but one of the user function is not reacting to cancellation (for full-restart), prolonged failing time (e.g., watchdog timeout 3min) shouldn’t be accounted to system (of course, the other way around has been seen -- e.g., FLINK-5463). > >> Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. > > I think classifier here is complementary to exception type approach. In this context, classifier is "f(exception) -> type". Type is used as metric dimension to set alert on certain types or have downtime breakdown on each type (type is not just fixed to "user" or "system" but can be more specific and customizable like statebackend and network). If we do wrap exceptions perfectly as you said, f() is simple enough to look at Exception type and then return its corresponding type. > > Initially we also thought complete wrapping would be ideal. However, even inside UDF, it can call in Flink framework like state update or call out dependent services, which service provider may want to classify separately. In addition, Flink allows user to use lower level API like streamoperator to make the border a little blurring. Those would make complete wrapping challenging. Besides, stack-based classification beyond exception type could still be needed for stuck progress classification. > > Without instrumentation, one of base classifiers that work for our environment in many cases is user-class-loader classifier, which can detect if an exception is thrown from the class loaded from user JAR/artifact (although this may be less desirable in an environment where user's artifacts can be installed directly in system lib/, but service providers would be opting in self-contained jar submission keeping system environment for system-only). > >> One thing that might be tricky is if error in Flink code is caused by user’s mistake. > > Right, this is the trickiest part. Based on our analysis with real data, the most ambiguous ones are custom serialization and out-of-resource errors. The former is usually seen in Flink runtime code rather than in UDF. The latter is that Flink stack is just a victim by resource hog/leak of user code (OOM, too many open files). For the serialization issue, we've been looking at (and learning) various serialization errors seen in the field to get reasonable classification. For the out-of-resource, rather than user vs. system classification, we can tag the type as "resource" relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. > >> Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. > > Again you're right and like you said, this part would be mostly reusing the existing building blocks such as latency marker and backpressure samplings. If configured only with progress monitoring not latency distribution tracking, latency marker can be lightweight skipping histogram update part just updating latest timestamp with longer period not to adversely affect performance. Once stuck progress is detected, stack sampling can tell us more about the context that causes backpressure. > >> Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. > Agreed. Once some level of initial discussion clears things out at least high level, I can start out more independent threads. > > Best, > Hwanju > > On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > > Hi Hwanju, > > Thanks for starting the discussion. Definitely any improvement in this area would be very helpful and valuable. Generally speaking +1 from my side, as long as we make sure that either such changes do not add performance overhead (which I think they shouldn’t) or they are optional. > >> Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. > > Couple of questions/remarks: > 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. > 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > >> Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking) > > What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > >> Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. > > Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. > > One thing that might be tricky is if error in Flink code is caused by user’s mistake. > > >> Fourthly, stuck progress > > Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. > > Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. > > Piotrek > >> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> wrote: >> >> Hi, >> >> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. >> >> For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. >> >> The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. >> >> Best, >> Hwanju >> > > > |
Hi,
As suggested by Piotrek, the first part, execution state tracking, is now split to a separate doc: https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing We'd appreciate any feedback. I am still using the same email thread to provide a full context, but please let me know if it's better to have a separate email thread as well. We will be sharing the remaining parts once ready. Thanks, Hwanju On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: Hi Hwanju & Chesney, Regarding various things that both of you mentioned, like accounting of state restoration separately or batch scheduling, we can always acknowledge some limitations of the initial approach and maybe we can address them later if we evaluate it worth the effort. Generally speaking all that you have written make sense to me, so +1 from my side to split the discussion into separate threads. Piotrek > On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> wrote: > > Hi Piotrek, > > Thanks for insightful feedback and indeed you got most tricky parts and concerns. > >> 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. > > As Chesnay said, initializeState is called in StreamTask.invoke after transitioning to RUNNING. So, task state restore part is currently during RUNNING. I think accounting state restore as running seems fine, since state size is user's artifact, as long as we can detect service error during restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens at initializeState in StreamTask.invoke). We can discuss about the need to have separate state to track restore and running separately, but it seems to add too many messages in common paths just for tracking. > >> 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For others, when the first task transitions to SCHEDULED, SCHEDULING stage begins, and when the first task transitions to DEPLOYING, it starts DEPLOYING stage. This would be fine especially for eager scheduling and full-restart fail-over strategy. In the individual or partial restart, we may not need to specifically track SCHEDULING and DEPLOYING states while treating job as running relying on progress monitor. > >> 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > > Exactly. I have roughly looked at batch side, but not in detail yet and am aware of ongoing scheduling work. Initial focus of breaking out to multiple states like scheduling/deploying would be only for streaming with eager scheduling. Need to give more thought how to deal with batching/lazy scheduling. > >> What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > > Basically, initial cause is traced back from each component of downtime, which is accounted to a certain type like user or system based on the classification. So you're right. Interesting part here is about secondary failure. For example, a user error causes a job to restart but then scheduling is failed by system issue. We need to account failing, restarting time to user, while scheduling time on restart (e.g,. 5min timeout) is to system. A further example is that a system error causes a job to be failing, but one of the user function is not reacting to cancellation (for full-restart), prolonged failing time (e.g., watchdog timeout 3min) shouldn’t be accounted to system (of course, the other way around has been seen -- e.g., FLINK-5463). > >> Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. > > I think classifier here is complementary to exception type approach. In this context, classifier is "f(exception) -> type". Type is used as metric dimension to set alert on certain types or have downtime breakdown on each type (type is not just fixed to "user" or "system" but can be more specific and customizable like statebackend and network). If we do wrap exceptions perfectly as you said, f() is simple enough to look at Exception type and then return its corresponding type. > > Initially we also thought complete wrapping would be ideal. However, even inside UDF, it can call in Flink framework like state update or call out dependent services, which service provider may want to classify separately. In addition, Flink allows user to use lower level API like streamoperator to make the border a little blurring. Those would make complete wrapping challenging. Besides, stack-based classification beyond exception type could still be needed for stuck progress classification. > > Without instrumentation, one of base classifiers that work for our environment in many cases is user-class-loader classifier, which can detect if an exception is thrown from the class loaded from user JAR/artifact (although this may be less desirable in an environment where user's artifacts can be installed directly in system lib/, but service providers would be opting in self-contained jar submission keeping system environment for system-only). > >> One thing that might be tricky is if error in Flink code is caused by user’s mistake. > > Right, this is the trickiest part. Based on our analysis with real data, the most ambiguous ones are custom serialization and out-of-resource errors. The former is usually seen in Flink runtime code rather than in UDF. The latter is that Flink stack is just a victim by resource hog/leak of user code (OOM, too many open files). For the serialization issue, we've been looking at (and learning) various serialization errors seen in the field to get reasonable classification. For the out-of-resource, rather than user vs. system classification, we can tag the type as "resource" relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. > >> Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. > > Again you're right and like you said, this part would be mostly reusing the existing building blocks such as latency marker and backpressure samplings. If configured only with progress monitoring not latency distribution tracking, latency marker can be lightweight skipping histogram update part just updating latest timestamp with longer period not to adversely affect performance. Once stuck progress is detected, stack sampling can tell us more about the context that causes backpressure. > >> Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. > Agreed. Once some level of initial discussion clears things out at least high level, I can start out more independent threads. > > Best, > Hwanju > > On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > > Hi Hwanju, > > Thanks for starting the discussion. Definitely any improvement in this area would be very helpful and valuable. Generally speaking +1 from my side, as long as we make sure that either such changes do not add performance overhead (which I think they shouldn’t) or they are optional. > >> Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. > > Couple of questions/remarks: > 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. > 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. > >> Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking) > > What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? > >> Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. > > Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. > > One thing that might be tricky is if error in Flink code is caused by user’s mistake. > > >> Fourthly, stuck progress > > Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. > > Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. > > Piotrek > >> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> wrote: >> >> Hi, >> >> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. >> >> For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. >> >> The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. >> >> Best, >> Hwanju >> > > > |
Hi Hwanju,
I looked through the document, however I’m not the best person to review/judge/discuss about implementation details here. I hope that Chesney will be able to help in this regard. Piotrek > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> wrote: > > Hi, > > As suggested by Piotrek, the first part, execution state tracking, is now split to a separate doc: > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > We'd appreciate any feedback. I am still using the same email thread to provide a full context, but please let me know if it's better to have a separate email thread as well. We will be sharing the remaining parts once ready. > > Thanks, > Hwanju > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: > > Hi Hwanju & Chesney, > > Regarding various things that both of you mentioned, like accounting of state restoration separately or batch scheduling, we can always acknowledge some limitations of the initial approach and maybe we can address them later if we evaluate it worth the effort. > > Generally speaking all that you have written make sense to me, so +1 from my side to split the discussion into separate threads. > > Piotrek > >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> wrote: >> >> Hi Piotrek, >> >> Thanks for insightful feedback and indeed you got most tricky parts and concerns. >> >>> 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. >> >> As Chesnay said, initializeState is called in StreamTask.invoke after transitioning to RUNNING. So, task state restore part is currently during RUNNING. I think accounting state restore as running seems fine, since state size is user's artifact, as long as we can detect service error during restore (indeed, DFS issue usually happens at createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens at initializeState in StreamTask.invoke). We can discuss about the need to have separate state to track restore and running separately, but it seems to add too many messages in common paths just for tracking. >> >>> 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. >> >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. For others, when the first task transitions to SCHEDULED, SCHEDULING stage begins, and when the first task transitions to DEPLOYING, it starts DEPLOYING stage. This would be fine especially for eager scheduling and full-restart fail-over strategy. In the individual or partial restart, we may not need to specifically track SCHEDULING and DEPLOYING states while treating job as running relying on progress monitor. >> >>> 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. >> >> Exactly. I have roughly looked at batch side, but not in detail yet and am aware of ongoing scheduling work. Initial focus of breaking out to multiple states like scheduling/deploying would be only for streaming with eager scheduling. Need to give more thought how to deal with batching/lazy scheduling. >> >>> What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? >> >> Basically, initial cause is traced back from each component of downtime, which is accounted to a certain type like user or system based on the classification. So you're right. Interesting part here is about secondary failure. For example, a user error causes a job to restart but then scheduling is failed by system issue. We need to account failing, restarting time to user, while scheduling time on restart (e.g,. 5min timeout) is to system. A further example is that a system error causes a job to be failing, but one of the user function is not reacting to cancellation (for full-restart), prolonged failing time (e.g., watchdog timeout 3min) shouldn’t be accounted to system (of course, the other way around has been seen -- e.g., FLINK-5463). >> >>> Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. >> >> I think classifier here is complementary to exception type approach. In this context, classifier is "f(exception) -> type". Type is used as metric dimension to set alert on certain types or have downtime breakdown on each type (type is not just fixed to "user" or "system" but can be more specific and customizable like statebackend and network). If we do wrap exceptions perfectly as you said, f() is simple enough to look at Exception type and then return its corresponding type. >> >> Initially we also thought complete wrapping would be ideal. However, even inside UDF, it can call in Flink framework like state update or call out dependent services, which service provider may want to classify separately. In addition, Flink allows user to use lower level API like streamoperator to make the border a little blurring. Those would make complete wrapping challenging. Besides, stack-based classification beyond exception type could still be needed for stuck progress classification. >> >> Without instrumentation, one of base classifiers that work for our environment in many cases is user-class-loader classifier, which can detect if an exception is thrown from the class loaded from user JAR/artifact (although this may be less desirable in an environment where user's artifacts can be installed directly in system lib/, but service providers would be opting in self-contained jar submission keeping system environment for system-only). >> >>> One thing that might be tricky is if error in Flink code is caused by user’s mistake. >> >> Right, this is the trickiest part. Based on our analysis with real data, the most ambiguous ones are custom serialization and out-of-resource errors. The former is usually seen in Flink runtime code rather than in UDF. The latter is that Flink stack is just a victim by resource hog/leak of user code (OOM, too many open files). For the serialization issue, we've been looking at (and learning) various serialization errors seen in the field to get reasonable classification. For the out-of-resource, rather than user vs. system classification, we can tag the type as "resource" relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. >> >>> Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. >> >> Again you're right and like you said, this part would be mostly reusing the existing building blocks such as latency marker and backpressure samplings. If configured only with progress monitoring not latency distribution tracking, latency marker can be lightweight skipping histogram update part just updating latest timestamp with longer period not to adversely affect performance. Once stuck progress is detected, stack sampling can tell us more about the context that causes backpressure. >> >>> Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. >> Agreed. Once some level of initial discussion clears things out at least high level, I can start out more independent threads. >> >> Best, >> Hwanju >> >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: >> >> Hi Hwanju, >> >> Thanks for starting the discussion. Definitely any improvement in this area would be very helpful and valuable. Generally speaking +1 from my side, as long as we make sure that either such changes do not add performance overhead (which I think they shouldn’t) or they are optional. >> >>> Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. >> >> Couple of questions/remarks: >> 1. Do we currently account state restore as “RUNNING”? If yes, this might be incorrect from your perspective. >> 2a. This might be more tricky if various Tasks are in various stages. For example in streaming, it should be safe to assume that state of the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as RUNNING only if all of the Tasks are either RUNNING or COMPLETED. >> 2b. However in batch - including DataStream jobs running against bounded data streams, like Blink SQL - this might be more tricky, since there are ongoing efforts to schedule part of the job graphs in stages. For example do not schedule probe side of the join until build side is done/completed. >> >>> Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking) >> >> What exactly would you like to report here? List of exception with downtime caused by it, for example: exception X caused a job to be down for 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state restore? >> >>> Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. >> >> Why do you think about implementing classifiers? Couldn’t we classify exceptions by exception type, like `FlinkUserException`, `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that we throw correct exception types + handle/wrap exceptions correctly when crossing Flink system/user code border? This way we could know exactly whether exception occurred in the user code or in Flink code. >> >> One thing that might be tricky is if error in Flink code is caused by user’s mistake. >> >> >>> Fourthly, stuck progress >> >> Hmmm, this might be tricky. We can quite easily detect which exact Task is causing back pressure in at least couple of different ways. Tricky part would be to determine whether this is caused by user or not, but probably some simple stack trace probing on back pressured task once every N seconds should solve this - similar how sampling profilers work. >> >> Luckily it seems like those four issues/proposals could be implemented/discussed independently or in stages. >> >> Piotrek >> >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> wrote: >>> >>> Hi, >>> >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a discussion thread about a project we consider for Flink operational improvement in production. We would like to start conversation early before detailed design, so any high-level feedback would welcome. >>> >>> For service providers who operate Flink in a multi-tenant environment, such as AWS Kinesis Data Analytics, it is crucial to measure application health and clearly differentiate application unavailability issue caused by Flink framework or service environment from the ones caused by application code. The current metrics of Flink represent overall job availability in time, it still needs to be improved to give Flink operators better insight for the detailed application availability. The current availability metrics such as uptime and downtime measures the time based on the running state of a job, which does not necessarily represent actual running state of a job (after a job transitions to running, each task should still be scheduled/deployed in order to run user-defined functions). The detailed view should enable operators to have visibility on 1) how long each specific stage takes (e.g., task scheduling or deployment), 2) what failure is introduced in which stage leading to job downtime, 3) whether such failure is classified to user code error (e.g., uncaught exception from user-defined function) or platform/environmental errors (e.g., checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). The last one is particularly needed to allow Flink operators to define SLA where only a small fraction of downtime should be introduced by service fault. All of these visibility enhancements can help community detect and fix Flink runtime issues quickly, whereby Flink can become more robust operating system for hosting data analytics applications. >>> >>> The current proposal is as follows. Firstly, we need to account time for each stage of task execution such as scheduling, deploying, and running, to enable better visibility of how long a job takes in which stage while not running user functions. Secondly, any downtime in each stage can be associated with a failure cause, which could be identified by Java exception notified to job manager on task failure or unhealthy task manager (Flink already maintains a cause but it can be associated with an execution stage for causal tracking). Thirdly, downtime reason should be classified into user- or system-induced failure. This needs exception classifier by drawing the line between user-defined functions (or public API) and Flink runtime — This is particularly challenging to have 100% accuracy at one-shot due to empirical nature and custom logic injection like serialization, so pluggable classifier filters are must-have to enable incremental improvement. Fourthly, stuck progress, where task is apparently running but not being able to process data generally manifesting itself as long backpressure, can be monitored as higher level job availability and the runtime can determine whether the reason to be stuck is caused by user (e.g., under-provisioned resource, user function bug) or system (deadlock or livelock in Flink runtime). Finally, all the detailed tracking information and metrics are exposed via REST and Flink metrics, so that Flink dashboard can have enhanced information about job execution/availability and operators can set alarm appropriately on metrics. >>> >>> Best, >>> Hwanju >>> >> >> >> > > > |
(Somehow my email has failed to be sent multiple times, so I am using my
personal email account) Hi, Piotrek - Thanks for the feedback! I revised the doc as commented. Here's the second part about exception classification - https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing I put cross-links between the first and the second. Thanks, Hwanju 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <[hidden email]>님이 작성: > Hi Hwanju, > > I looked through the document, however I’m not the best person to > review/judge/discuss about implementation details here. I hope that Chesney > will be able to help in this regard. > > Piotrek > > > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> > wrote: > > > > Hi, > > > > As suggested by Piotrek, the first part, execution state tracking, is > now split to a separate doc: > > > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > > > We'd appreciate any feedback. I am still using the same email thread to > provide a full context, but please let me know if it's better to have a > separate email thread as well. We will be sharing the remaining parts once > ready. > > > > Thanks, > > Hwanju > > > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: > > > > Hi Hwanju & Chesney, > > > > Regarding various things that both of you mentioned, like accounting > of state restoration separately or batch scheduling, we can always > acknowledge some limitations of the initial approach and maybe we can > address them later if we evaluate it worth the effort. > > > > Generally speaking all that you have written make sense to me, so +1 > from my side to split the discussion into separate threads. > > > > Piotrek > > > >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> > wrote: > >> > >> Hi Piotrek, > >> > >> Thanks for insightful feedback and indeed you got most tricky parts and > concerns. > >> > >>> 1. Do we currently account state restore as “RUNNING”? If yes, this > might be incorrect from your perspective. > >> > >> As Chesnay said, initializeState is called in StreamTask.invoke after > transitioning to RUNNING. So, task state restore part is currently during > RUNNING. I think accounting state restore as running seems fine, since > state size is user's artifact, as long as we can detect service error > during restore (indeed, DFS issue usually happens at > createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens > at initializeState in StreamTask.invoke). We can discuss about the need to > have separate state to track restore and running separately, but it seems > to add too many messages in common paths just for tracking. > >> > >>> 2a. This might be more tricky if various Tasks are in various stages. > For example in streaming, it should be safe to assume that state of the > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> > >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. > For others, when the first task transitions to SCHEDULED, SCHEDULING stage > begins, and when the first task transitions to DEPLOYING, it starts > DEPLOYING stage. This would be fine especially for eager scheduling and > full-restart fail-over strategy. In the individual or partial restart, we > may not need to specifically track SCHEDULING and DEPLOYING states while > treating job as running relying on progress monitor. > >> > >>> 2b. However in batch - including DataStream jobs running against > bounded data streams, like Blink SQL - this might be more tricky, since > there are ongoing efforts to schedule part of the job graphs in stages. For > example do not schedule probe side of the join until build side is > done/completed. > >> > >> Exactly. I have roughly looked at batch side, but not in detail yet and > am aware of ongoing scheduling work. Initial focus of breaking out to > multiple states like scheduling/deploying would be only for streaming with > eager scheduling. Need to give more thought how to deal with batching/lazy > scheduling. > >> > >>> What exactly would you like to report here? List of exception with > downtime caused by it, for example: exception X caused a job to be down for > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > restore? > >> > >> Basically, initial cause is traced back from each component of > downtime, which is accounted to a certain type like user or system based on > the classification. So you're right. Interesting part here is about > secondary failure. For example, a user error causes a job to restart but > then scheduling is failed by system issue. We need to account failing, > restarting time to user, while scheduling time on restart (e.g,. 5min > timeout) is to system. A further example is that a system error causes a > job to be failing, but one of the user function is not reacting to > cancellation (for full-restart), prolonged failing time (e.g., watchdog > timeout 3min) shouldn’t be accounted to system (of course, the other way > around has been seen -- e.g., FLINK-5463). > >> > >>> Why do you think about implementing classifiers? Couldn’t we classify > exceptions by exception type, like `FlinkUserException`, > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that > we throw correct exception types + handle/wrap exceptions correctly when > crossing Flink system/user code border? This way we could know exactly > whether exception occurred in the user code or in Flink code. > >> > >> I think classifier here is complementary to exception type approach. In > this context, classifier is "f(exception) -> type". Type is used as metric > dimension to set alert on certain types or have downtime breakdown on each > type (type is not just fixed to "user" or "system" but can be more specific > and customizable like statebackend and network). If we do wrap exceptions > perfectly as you said, f() is simple enough to look at Exception type and > then return its corresponding type. > >> > >> Initially we also thought complete wrapping would be ideal. However, > even inside UDF, it can call in Flink framework like state update or call > out dependent services, which service provider may want to classify > separately. In addition, Flink allows user to use lower level API like > streamoperator to make the border a little blurring. Those would make > complete wrapping challenging. Besides, stack-based classification beyond > exception type could still be needed for stuck progress classification. > >> > >> Without instrumentation, one of base classifiers that work for our > environment in many cases is user-class-loader classifier, which can detect > if an exception is thrown from the class loaded from user JAR/artifact > (although this may be less desirable in an environment where user's > artifacts can be installed directly in system lib/, but service providers > would be opting in self-contained jar submission keeping system environment > for system-only). > >> > >>> One thing that might be tricky is if error in Flink code is caused by > user’s mistake. > >> > >> Right, this is the trickiest part. Based on our analysis with real > data, the most ambiguous ones are custom serialization and out-of-resource > errors. The former is usually seen in Flink runtime code rather than in > UDF. The latter is that Flink stack is just a victim by resource hog/leak > of user code (OOM, too many open files). For the serialization issue, we've > been looking at (and learning) various serialization errors seen in the > field to get reasonable classification. For the out-of-resource, rather > than user vs. system classification, we can tag the type as "resource" > relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. > >> > >>> Hmmm, this might be tricky. We can quite easily detect which exact > Task is causing back pressure in at least couple of different ways. Tricky > part would be to determine whether this is caused by user or not, but > probably some simple stack trace probing on back pressured task once every > N seconds should solve this - similar how sampling profilers work. > >> > >> Again you're right and like you said, this part would be mostly reusing > the existing building blocks such as latency marker and backpressure > samplings. If configured only with progress monitoring not latency > distribution tracking, latency marker can be lightweight skipping histogram > update part just updating latest timestamp with longer period not to > adversely affect performance. Once stuck progress is detected, stack > sampling can tell us more about the context that causes backpressure. > >> > >>> Luckily it seems like those four issues/proposals could be > implemented/discussed independently or in stages. > >> Agreed. Once some level of initial discussion clears things out at > least high level, I can start out more independent threads. > >> > >> Best, > >> Hwanju > >> > >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > >> > >> Hi Hwanju, > >> > >> Thanks for starting the discussion. Definitely any improvement in > this area would be very helpful and valuable. Generally speaking +1 from my > side, as long as we make sure that either such changes do not add > performance overhead (which I think they shouldn’t) or they are optional. > >> > >>> Firstly, we need to account time for each stage of task execution such > as scheduling, deploying, and running, to enable better visibility of how > long a job takes in which stage while not running user functions. > >> > >> Couple of questions/remarks: > >> 1. Do we currently account state restore as “RUNNING”? If yes, this > might be incorrect from your perspective. > >> 2a. This might be more tricky if various Tasks are in various stages. > For example in streaming, it should be safe to assume that state of the > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> 2b. However in batch - including DataStream jobs running against > bounded data streams, like Blink SQL - this might be more tricky, since > there are ongoing efforts to schedule part of the job graphs in stages. For > example do not schedule probe side of the join until build side is > done/completed. > >> > >>> Secondly, any downtime in each stage can be associated with a failure > cause, which could be identified by Java exception notified to job manager > on task failure or unhealthy task manager (Flink already maintains a cause > but it can be associated with an execution stage for causal tracking) > >> > >> What exactly would you like to report here? List of exception with > downtime caused by it, for example: exception X caused a job to be down for > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > restore? > >> > >>> Thirdly, downtime reason should be classified into user- or > system-induced failure. This needs exception classifier by drawing the line > between user-defined functions (or public API) and Flink runtime — This is > particularly challenging to have 100% accuracy at one-shot due to empirical > nature and custom logic injection like serialization, so pluggable > classifier filters are must-have to enable incremental improvement. > >> > >> Why do you think about implementing classifiers? Couldn’t we classify > exceptions by exception type, like `FlinkUserException`, > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that > we throw correct exception types + handle/wrap exceptions correctly when > crossing Flink system/user code border? This way we could know exactly > whether exception occurred in the user code or in Flink code. > >> > >> One thing that might be tricky is if error in Flink code is caused by > user’s mistake. > >> > >> > >>> Fourthly, stuck progress > >> > >> Hmmm, this might be tricky. We can quite easily detect which exact > Task is causing back pressure in at least couple of different ways. Tricky > part would be to determine whether this is caused by user or not, but > probably some simple stack trace probing on back pressured task once every > N seconds should solve this - similar how sampling profilers work. > >> > >> Luckily it seems like those four issues/proposals could be > implemented/discussed independently or in stages. > >> > >> Piotrek > >> > >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> > wrote: > >>> > >>> Hi, > >>> > >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a > discussion thread about a project we consider for Flink operational > improvement in production. We would like to start conversation early before > detailed design, so any high-level feedback would welcome. > >>> > >>> For service providers who operate Flink in a multi-tenant environment, > such as AWS Kinesis Data Analytics, it is crucial to measure application > health and clearly differentiate application unavailability issue caused by > Flink framework or service environment from the ones caused by application > code. The current metrics of Flink represent overall job availability in > time, it still needs to be improved to give Flink operators better insight > for the detailed application availability. The current availability metrics > such as uptime and downtime measures the time based on the running state of > a job, which does not necessarily represent actual running state of a job > (after a job transitions to running, each task should still be > scheduled/deployed in order to run user-defined functions). The detailed > view should enable operators to have visibility on 1) how long each > specific stage takes (e.g., task scheduling or deployment), 2) what failure > is introduced in which stage leading to job downtime, 3) whether such > failure is classified to user code error (e.g., uncaught exception from > user-defined function) or platform/environmental errors (e.g., > checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). > The last one is particularly needed to allow Flink operators to define SLA > where only a small fraction of downtime should be introduced by service > fault. All of these visibility enhancements can help community detect and > fix Flink runtime issues quickly, whereby Flink can become more robust > operating system for hosting data analytics applications. > >>> > >>> The current proposal is as follows. Firstly, we need to account time > for each stage of task execution such as scheduling, deploying, and > running, to enable better visibility of how long a job takes in which stage > while not running user functions. Secondly, any downtime in each stage can > be associated with a failure cause, which could be identified by Java > exception notified to job manager on task failure or unhealthy task manager > (Flink already maintains a cause but it can be associated with an execution > stage for causal tracking). Thirdly, downtime reason should be classified > into user- or system-induced failure. This needs exception classifier by > drawing the line between user-defined functions (or public API) and Flink > runtime — This is particularly challenging to have 100% accuracy at > one-shot due to empirical nature and custom logic injection like > serialization, so pluggable classifier filters are must-have to enable > incremental improvement. Fourthly, stuck progress, where task is apparently > running but not being able to process data generally manifesting itself as > long backpressure, can be monitored as higher level job availability and > the runtime can determine whether the reason to be stuck is caused by user > (e.g., under-provisioned resource, user function bug) or system (deadlock > or livelock in Flink runtime). Finally, all the detailed tracking > information and metrics are exposed via REST and Flink metrics, so that > Flink dashboard can have enhanced information about job > execution/availability and operators can set alarm appropriately on metrics. > >>> > >>> Best, > >>> Hwanju > >>> > >> > >> > >> > > > > > > > > |
Hi Hwanju,
Thanks for the proposal. The enhancement will improve the operability of Flink and make it more service provider friendly. So in general I am +1 on the proposal. A few questions / thoughts: 1. From what I understand, the current availability metrics <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#availability> reports the time elapsed since the job enters this state. So the metric is not accumulative. It just lets the users know how long the job has been in the current status for this one time. From the motivation of this proposal, it seems that we will need an aggregated metric which reports the cumulative time a job is in a particular state. So do you propose to add the new metrics while still keep the old ones? 2. If the metrics are cumulative, one apparent use case is to measure SLA which is critical to service providers. Other than this use case, is there any other use cases that you have in mind? The reason I am asking this is because I am wondering whether the aggregation work should be done inside Flink or outside of Flink, or at least by some pluggable. If I understand correctly, the current design proposes doing the aggregation inside Flink, which means users are responsible for tagging, and Flink simply aggregate the values based on tags. In the contrast, if the aggregation is done outside of Flink / by a pluggable, users will be responsible for both tagging and aggregation. Flink will only make sure that each state change will be reported to the users (probably with some kind of cause of what triggered the state change), but users would have the responsibility and freedom to interpret the information reported by Flink. 3. If we agree on the basic principle that Flink reports and users interpret, maybe we can introduce a pluggable of JobStateListener which interprets the state change reported by Flink and decide what to do. We can have a default implementation which does the aggregation in the current proposal. What do you think? Thanks, Jiangjie (Becket) Qin On Wed, May 29, 2019 at 2:42 PM Hwanju Kim <[hidden email]> wrote: > (Somehow my email has failed to be sent multiple times, so I am using my > personal email account) > > Hi, > > Piotrek - Thanks for the feedback! I revised the doc as commented. > > Here's the second part about exception classification - > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing > I put cross-links between the first and the second. > > Thanks, > Hwanju > > 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <[hidden email]>님이 작성: > > > Hi Hwanju, > > > > I looked through the document, however I’m not the best person to > > review/judge/discuss about implementation details here. I hope that > Chesney > > will be able to help in this regard. > > > > Piotrek > > > > > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> > > wrote: > > > > > > Hi, > > > > > > As suggested by Piotrek, the first part, execution state tracking, is > > now split to a separate doc: > > > > > > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > > > > > We'd appreciate any feedback. I am still using the same email thread to > > provide a full context, but please let me know if it's better to have a > > separate email thread as well. We will be sharing the remaining parts > once > > ready. > > > > > > Thanks, > > > Hwanju > > > > > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: > > > > > > Hi Hwanju & Chesney, > > > > > > Regarding various things that both of you mentioned, like accounting > > of state restoration separately or batch scheduling, we can always > > acknowledge some limitations of the initial approach and maybe we can > > address them later if we evaluate it worth the effort. > > > > > > Generally speaking all that you have written make sense to me, so +1 > > from my side to split the discussion into separate threads. > > > > > > Piotrek > > > > > >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> > > wrote: > > >> > > >> Hi Piotrek, > > >> > > >> Thanks for insightful feedback and indeed you got most tricky parts > and > > concerns. > > >> > > >>> 1. Do we currently account state restore as “RUNNING”? If yes, this > > might be incorrect from your perspective. > > >> > > >> As Chesnay said, initializeState is called in StreamTask.invoke after > > transitioning to RUNNING. So, task state restore part is currently during > > RUNNING. I think accounting state restore as running seems fine, since > > state size is user's artifact, as long as we can detect service error > > during restore (indeed, DFS issue usually happens at > > createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens > > at initializeState in StreamTask.invoke). We can discuss about the need > to > > have separate state to track restore and running separately, but it seems > > to add too many messages in common paths just for tracking. > > >> > > >>> 2a. This might be more tricky if various Tasks are in various stages. > > For example in streaming, it should be safe to assume that state of the > > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > >> > > >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. > > For others, when the first task transitions to SCHEDULED, SCHEDULING > stage > > begins, and when the first task transitions to DEPLOYING, it starts > > DEPLOYING stage. This would be fine especially for eager scheduling and > > full-restart fail-over strategy. In the individual or partial restart, we > > may not need to specifically track SCHEDULING and DEPLOYING states while > > treating job as running relying on progress monitor. > > >> > > >>> 2b. However in batch - including DataStream jobs running against > > bounded data streams, like Blink SQL - this might be more tricky, since > > there are ongoing efforts to schedule part of the job graphs in stages. > For > > example do not schedule probe side of the join until build side is > > done/completed. > > >> > > >> Exactly. I have roughly looked at batch side, but not in detail yet > and > > am aware of ongoing scheduling work. Initial focus of breaking out to > > multiple states like scheduling/deploying would be only for streaming > with > > eager scheduling. Need to give more thought how to deal with > batching/lazy > > scheduling. > > >> > > >>> What exactly would you like to report here? List of exception with > > downtime caused by it, for example: exception X caused a job to be down > for > > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > > restore? > > >> > > >> Basically, initial cause is traced back from each component of > > downtime, which is accounted to a certain type like user or system based > on > > the classification. So you're right. Interesting part here is about > > secondary failure. For example, a user error causes a job to restart but > > then scheduling is failed by system issue. We need to account failing, > > restarting time to user, while scheduling time on restart (e.g,. 5min > > timeout) is to system. A further example is that a system error causes a > > job to be failing, but one of the user function is not reacting to > > cancellation (for full-restart), prolonged failing time (e.g., watchdog > > timeout 3min) shouldn’t be accounted to system (of course, the other way > > around has been seen -- e.g., FLINK-5463). > > >> > > >>> Why do you think about implementing classifiers? Couldn’t we classify > > exceptions by exception type, like `FlinkUserException`, > > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > that > > we throw correct exception types + handle/wrap exceptions correctly when > > crossing Flink system/user code border? This way we could know exactly > > whether exception occurred in the user code or in Flink code. > > >> > > >> I think classifier here is complementary to exception type approach. > In > > this context, classifier is "f(exception) -> type". Type is used as > metric > > dimension to set alert on certain types or have downtime breakdown on > each > > type (type is not just fixed to "user" or "system" but can be more > specific > > and customizable like statebackend and network). If we do wrap exceptions > > perfectly as you said, f() is simple enough to look at Exception type and > > then return its corresponding type. > > >> > > >> Initially we also thought complete wrapping would be ideal. However, > > even inside UDF, it can call in Flink framework like state update or call > > out dependent services, which service provider may want to classify > > separately. In addition, Flink allows user to use lower level API like > > streamoperator to make the border a little blurring. Those would make > > complete wrapping challenging. Besides, stack-based classification beyond > > exception type could still be needed for stuck progress classification. > > >> > > >> Without instrumentation, one of base classifiers that work for our > > environment in many cases is user-class-loader classifier, which can > detect > > if an exception is thrown from the class loaded from user JAR/artifact > > (although this may be less desirable in an environment where user's > > artifacts can be installed directly in system lib/, but service providers > > would be opting in self-contained jar submission keeping system > environment > > for system-only). > > >> > > >>> One thing that might be tricky is if error in Flink code is caused > by > > user’s mistake. > > >> > > >> Right, this is the trickiest part. Based on our analysis with real > > data, the most ambiguous ones are custom serialization and > out-of-resource > > errors. The former is usually seen in Flink runtime code rather than in > > UDF. The latter is that Flink stack is just a victim by resource hog/leak > > of user code (OOM, too many open files). For the serialization issue, > we've > > been looking at (and learning) various serialization errors seen in the > > field to get reasonable classification. For the out-of-resource, rather > > than user vs. system classification, we can tag the type as "resource" > > relying on dump (e.g., heap dump) and postmortem analysis as-needed > basis. > > >> > > >>> Hmmm, this might be tricky. We can quite easily detect which exact > > Task is causing back pressure in at least couple of different ways. > Tricky > > part would be to determine whether this is caused by user or not, but > > probably some simple stack trace probing on back pressured task once > every > > N seconds should solve this - similar how sampling profilers work. > > >> > > >> Again you're right and like you said, this part would be mostly > reusing > > the existing building blocks such as latency marker and backpressure > > samplings. If configured only with progress monitoring not latency > > distribution tracking, latency marker can be lightweight skipping > histogram > > update part just updating latest timestamp with longer period not to > > adversely affect performance. Once stuck progress is detected, stack > > sampling can tell us more about the context that causes backpressure. > > >> > > >>> Luckily it seems like those four issues/proposals could be > > implemented/discussed independently or in stages. > > >> Agreed. Once some level of initial discussion clears things out at > > least high level, I can start out more independent threads. > > >> > > >> Best, > > >> Hwanju > > >> > > >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > > >> > > >> Hi Hwanju, > > >> > > >> Thanks for starting the discussion. Definitely any improvement in > > this area would be very helpful and valuable. Generally speaking +1 from > my > > side, as long as we make sure that either such changes do not add > > performance overhead (which I think they shouldn’t) or they are optional. > > >> > > >>> Firstly, we need to account time for each stage of task execution > such > > as scheduling, deploying, and running, to enable better visibility of how > > long a job takes in which stage while not running user functions. > > >> > > >> Couple of questions/remarks: > > >> 1. Do we currently account state restore as “RUNNING”? If yes, this > > might be incorrect from your perspective. > > >> 2a. This might be more tricky if various Tasks are in various > stages. > > For example in streaming, it should be safe to assume that state of the > > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > >> 2b. However in batch - including DataStream jobs running against > > bounded data streams, like Blink SQL - this might be more tricky, since > > there are ongoing efforts to schedule part of the job graphs in stages. > For > > example do not schedule probe side of the join until build side is > > done/completed. > > >> > > >>> Secondly, any downtime in each stage can be associated with a failure > > cause, which could be identified by Java exception notified to job > manager > > on task failure or unhealthy task manager (Flink already maintains a > cause > > but it can be associated with an execution stage for causal tracking) > > >> > > >> What exactly would you like to report here? List of exception with > > downtime caused by it, for example: exception X caused a job to be down > for > > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > > restore? > > >> > > >>> Thirdly, downtime reason should be classified into user- or > > system-induced failure. This needs exception classifier by drawing the > line > > between user-defined functions (or public API) and Flink runtime — This > is > > particularly challenging to have 100% accuracy at one-shot due to > empirical > > nature and custom logic injection like serialization, so pluggable > > classifier filters are must-have to enable incremental improvement. > > >> > > >> Why do you think about implementing classifiers? Couldn’t we > classify > > exceptions by exception type, like `FlinkUserException`, > > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > that > > we throw correct exception types + handle/wrap exceptions correctly when > > crossing Flink system/user code border? This way we could know exactly > > whether exception occurred in the user code or in Flink code. > > >> > > >> One thing that might be tricky is if error in Flink code is caused > by > > user’s mistake. > > >> > > >> > > >>> Fourthly, stuck progress > > >> > > >> Hmmm, this might be tricky. We can quite easily detect which exact > > Task is causing back pressure in at least couple of different ways. > Tricky > > part would be to determine whether this is caused by user or not, but > > probably some simple stack trace probing on back pressured task once > every > > N seconds should solve this - similar how sampling profilers work. > > >> > > >> Luckily it seems like those four issues/proposals could be > > implemented/discussed independently or in stages. > > >> > > >> Piotrek > > >> > > >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> > > wrote: > > >>> > > >>> Hi, > > >>> > > >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a > > discussion thread about a project we consider for Flink operational > > improvement in production. We would like to start conversation early > before > > detailed design, so any high-level feedback would welcome. > > >>> > > >>> For service providers who operate Flink in a multi-tenant > environment, > > such as AWS Kinesis Data Analytics, it is crucial to measure application > > health and clearly differentiate application unavailability issue caused > by > > Flink framework or service environment from the ones caused by > application > > code. The current metrics of Flink represent overall job availability in > > time, it still needs to be improved to give Flink operators better > insight > > for the detailed application availability. The current availability > metrics > > such as uptime and downtime measures the time based on the running state > of > > a job, which does not necessarily represent actual running state of a job > > (after a job transitions to running, each task should still be > > scheduled/deployed in order to run user-defined functions). The detailed > > view should enable operators to have visibility on 1) how long each > > specific stage takes (e.g., task scheduling or deployment), 2) what > failure > > is introduced in which stage leading to job downtime, 3) whether such > > failure is classified to user code error (e.g., uncaught exception from > > user-defined function) or platform/environmental errors (e.g., > > checkpointing issue, unhealthy nodes hosting job/task managers, Flink > bug). > > The last one is particularly needed to allow Flink operators to define > SLA > > where only a small fraction of downtime should be introduced by service > > fault. All of these visibility enhancements can help community detect and > > fix Flink runtime issues quickly, whereby Flink can become more robust > > operating system for hosting data analytics applications. > > >>> > > >>> The current proposal is as follows. Firstly, we need to account time > > for each stage of task execution such as scheduling, deploying, and > > running, to enable better visibility of how long a job takes in which > stage > > while not running user functions. Secondly, any downtime in each stage > can > > be associated with a failure cause, which could be identified by Java > > exception notified to job manager on task failure or unhealthy task > manager > > (Flink already maintains a cause but it can be associated with an > execution > > stage for causal tracking). Thirdly, downtime reason should be classified > > into user- or system-induced failure. This needs exception classifier by > > drawing the line between user-defined functions (or public API) and Flink > > runtime — This is particularly challenging to have 100% accuracy at > > one-shot due to empirical nature and custom logic injection like > > serialization, so pluggable classifier filters are must-have to enable > > incremental improvement. Fourthly, stuck progress, where task is > apparently > > running but not being able to process data generally manifesting itself > as > > long backpressure, can be monitored as higher level job availability and > > the runtime can determine whether the reason to be stuck is caused by > user > > (e.g., under-provisioned resource, user function bug) or system (deadlock > > or livelock in Flink runtime). Finally, all the detailed tracking > > information and metrics are exposed via REST and Flink metrics, so that > > Flink dashboard can have enhanced information about job > > execution/availability and operators can set alarm appropriately on > metrics. > > >>> > > >>> Best, > > >>> Hwanju > > >>> > > >> > > >> > > >> > > > > > > > > > > > > > > |
Hi Becket,
Sorry for the late response since I might have missed this during my vacation. Thanks for the feedback! Please find inlined below. 2019년 6월 16일 (일) 오후 6:46, Becket Qin <[hidden email]>님이 작성: > Hi Hwanju, > > Thanks for the proposal. The enhancement will improve the operability of > Flink and make it more service provider friendly. So in general I am +1 on > the proposal. > A few questions / thoughts: > > 1. From what I understand, the current availability metrics > < > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#availability > > > reports > the time elapsed since the job enters this state. So the metric is not > accumulative. It just lets the users know how long the job has been in the > current status for this one time. From the motivation of this proposal, it > seems that we will need an aggregated metric which reports the cumulative > time a job is in a particular state. So do you propose to add the new > metrics while still keep the old ones? > Right, we want the time metrics to be cumulative elapsed time since a job is started, so one can query/scrape downtime vs. total time. We do not want to replace the old ones, which someone could have relied on already using the current semantics. > > 2. If the metrics are cumulative, one apparent use case is to measure SLA > which is critical to service providers. Other than this use case, is there > any other use cases that you have in mind? Apart from SLA, performance measurement of certain stages like scheduling or task deployment (but it may still be in line with SLO purpose for downtime). > The reason I am asking this is > because I am wondering whether the aggregation work should be done inside > Flink or outside of Flink, or at least by some pluggable. If I understand > correctly, the current design proposes doing the aggregation inside Flink, > which means users are responsible for tagging, and Flink simply aggregate > the values based on tags. In the contrast, if the aggregation is done > outside of Flink / by a pluggable, users will be responsible for both > tagging and aggregation. Flink will only make sure that each state change > will be reported to the users (probably with some kind of cause of what > triggered the state change), but users would have the responsibility and > freedom to interpret the information reported by Flink. > > 3. If we agree on the basic principle that Flink reports and users > interpret, maybe we can introduce a pluggable of JobStateListener which > interprets the state change reported by Flink and decide what to do. We can > have a default implementation which does the aggregation in the current > proposal. > Good point. Having generic listener would be ideal. Actually, Piotrek pointed me to "[Discuss] Add JobListener (hook) in flink job lifecycle" as the one to leverage. As you mentioned, that discussion is to allow external client to listen to job status change so that 3rd party app can be aware of job status. I was thinking that if such effort can also allow JM to be also a potential client, we can make execution time tracker utilize such interfaces. But the focus of that discussion is more on external client app (not internal components), as Till also mentioned. Currently, we are considering making the tracker to be rather coupled in JobMaster as it relies on ExecutionState and scheduling/failover strategy. With coupling, it's using method call instead of RPC to be efficient. Nevertheless, we are defining clear interface for tracking job status and execution state, so that internal implementation can be plugged to whatever can define the same or superset of interfaces (basically old status, new status, error are required). To sum up, we hope to see more discussion about generic listener model along with more use cases, and in our case, execution tracker is considered as client implementation for elapsed time tracking for each state, so it could accommodate other interfaces like the listener. > > What do you think? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, May 29, 2019 at 2:42 PM Hwanju Kim <[hidden email]> wrote: > > > (Somehow my email has failed to be sent multiple times, so I am using my > > personal email account) > > > > Hi, > > > > Piotrek - Thanks for the feedback! I revised the doc as commented. > > > > Here's the second part about exception classification - > > > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing > > I put cross-links between the first and the second. > > > > Thanks, > > Hwanju > > > > 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <[hidden email]>님이 작성: > > > > > Hi Hwanju, > > > > > > I looked through the document, however I’m not the best person to > > > review/judge/discuss about implementation details here. I hope that > > Chesney > > > will be able to help in this regard. > > > > > > Piotrek > > > > > > > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> > > > wrote: > > > > > > > > Hi, > > > > > > > > As suggested by Piotrek, the first part, execution state tracking, is > > > now split to a separate doc: > > > > > > > > > > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > > > > > > > > We'd appreciate any feedback. I am still using the same email thread > to > > > provide a full context, but please let me know if it's better to have a > > > separate email thread as well. We will be sharing the remaining parts > > once > > > ready. > > > > > > > > Thanks, > > > > Hwanju > > > > > > > > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: > > > > > > > > Hi Hwanju & Chesney, > > > > > > > > Regarding various things that both of you mentioned, like > accounting > > > of state restoration separately or batch scheduling, we can always > > > acknowledge some limitations of the initial approach and maybe we can > > > address them later if we evaluate it worth the effort. > > > > > > > > Generally speaking all that you have written make sense to me, so > +1 > > > from my side to split the discussion into separate threads. > > > > > > > > Piotrek > > > > > > > >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> > > > wrote: > > > >> > > > >> Hi Piotrek, > > > >> > > > >> Thanks for insightful feedback and indeed you got most tricky parts > > and > > > concerns. > > > >> > > > >>> 1. Do we currently account state restore as “RUNNING”? If yes, this > > > might be incorrect from your perspective. > > > >> > > > >> As Chesnay said, initializeState is called in StreamTask.invoke > after > > > transitioning to RUNNING. So, task state restore part is currently > during > > > RUNNING. I think accounting state restore as running seems fine, since > > > state size is user's artifact, as long as we can detect service error > > > during restore (indeed, DFS issue usually happens at > > > createCheckpointStorage (e.g., S3 server error) and RocksDB issue > happens > > > at initializeState in StreamTask.invoke). We can discuss about the need > > to > > > have separate state to track restore and running separately, but it > seems > > > to add too many messages in common paths just for tracking. > > > >> > > > >>> 2a. This might be more tricky if various Tasks are in various > stages. > > > For example in streaming, it should be safe to assume that state of the > > > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > > > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > > >> > > > >> Right. For RUNNING, all the tasks in the graph transitions to > RUNNING. > > > For others, when the first task transitions to SCHEDULED, SCHEDULING > > stage > > > begins, and when the first task transitions to DEPLOYING, it starts > > > DEPLOYING stage. This would be fine especially for eager scheduling and > > > full-restart fail-over strategy. In the individual or partial restart, > we > > > may not need to specifically track SCHEDULING and DEPLOYING states > while > > > treating job as running relying on progress monitor. > > > >> > > > >>> 2b. However in batch - including DataStream jobs running against > > > bounded data streams, like Blink SQL - this might be more tricky, since > > > there are ongoing efforts to schedule part of the job graphs in stages. > > For > > > example do not schedule probe side of the join until build side is > > > done/completed. > > > >> > > > >> Exactly. I have roughly looked at batch side, but not in detail yet > > and > > > am aware of ongoing scheduling work. Initial focus of breaking out to > > > multiple states like scheduling/deploying would be only for streaming > > with > > > eager scheduling. Need to give more thought how to deal with > > batching/lazy > > > scheduling. > > > >> > > > >>> What exactly would you like to report here? List of exception with > > > downtime caused by it, for example: exception X caused a job to be down > > for > > > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes > state > > > restore? > > > >> > > > >> Basically, initial cause is traced back from each component of > > > downtime, which is accounted to a certain type like user or system > based > > on > > > the classification. So you're right. Interesting part here is about > > > secondary failure. For example, a user error causes a job to restart > but > > > then scheduling is failed by system issue. We need to account failing, > > > restarting time to user, while scheduling time on restart (e.g,. 5min > > > timeout) is to system. A further example is that a system error causes > a > > > job to be failing, but one of the user function is not reacting to > > > cancellation (for full-restart), prolonged failing time (e.g., watchdog > > > timeout 3min) shouldn’t be accounted to system (of course, the other > way > > > around has been seen -- e.g., FLINK-5463). > > > >> > > > >>> Why do you think about implementing classifiers? Couldn’t we > classify > > > exceptions by exception type, like `FlinkUserException`, > > > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > > that > > > we throw correct exception types + handle/wrap exceptions correctly > when > > > crossing Flink system/user code border? This way we could know exactly > > > whether exception occurred in the user code or in Flink code. > > > >> > > > >> I think classifier here is complementary to exception type approach. > > In > > > this context, classifier is "f(exception) -> type". Type is used as > > metric > > > dimension to set alert on certain types or have downtime breakdown on > > each > > > type (type is not just fixed to "user" or "system" but can be more > > specific > > > and customizable like statebackend and network). If we do wrap > exceptions > > > perfectly as you said, f() is simple enough to look at Exception type > and > > > then return its corresponding type. > > > >> > > > >> Initially we also thought complete wrapping would be ideal. However, > > > even inside UDF, it can call in Flink framework like state update or > call > > > out dependent services, which service provider may want to classify > > > separately. In addition, Flink allows user to use lower level API like > > > streamoperator to make the border a little blurring. Those would make > > > complete wrapping challenging. Besides, stack-based classification > beyond > > > exception type could still be needed for stuck progress classification. > > > >> > > > >> Without instrumentation, one of base classifiers that work for our > > > environment in many cases is user-class-loader classifier, which can > > detect > > > if an exception is thrown from the class loaded from user JAR/artifact > > > (although this may be less desirable in an environment where user's > > > artifacts can be installed directly in system lib/, but service > providers > > > would be opting in self-contained jar submission keeping system > > environment > > > for system-only). > > > >> > > > >>> One thing that might be tricky is if error in Flink code is caused > > by > > > user’s mistake. > > > >> > > > >> Right, this is the trickiest part. Based on our analysis with real > > > data, the most ambiguous ones are custom serialization and > > out-of-resource > > > errors. The former is usually seen in Flink runtime code rather than > in > > > UDF. The latter is that Flink stack is just a victim by resource > hog/leak > > > of user code (OOM, too many open files). For the serialization issue, > > we've > > > been looking at (and learning) various serialization errors seen in the > > > field to get reasonable classification. For the out-of-resource, rather > > > than user vs. system classification, we can tag the type as "resource" > > > relying on dump (e.g., heap dump) and postmortem analysis as-needed > > basis. > > > >> > > > >>> Hmmm, this might be tricky. We can quite easily detect which exact > > > Task is causing back pressure in at least couple of different ways. > > Tricky > > > part would be to determine whether this is caused by user or not, but > > > probably some simple stack trace probing on back pressured task once > > every > > > N seconds should solve this - similar how sampling profilers work. > > > >> > > > >> Again you're right and like you said, this part would be mostly > > reusing > > > the existing building blocks such as latency marker and backpressure > > > samplings. If configured only with progress monitoring not latency > > > distribution tracking, latency marker can be lightweight skipping > > histogram > > > update part just updating latest timestamp with longer period not to > > > adversely affect performance. Once stuck progress is detected, stack > > > sampling can tell us more about the context that causes backpressure. > > > >> > > > >>> Luckily it seems like those four issues/proposals could be > > > implemented/discussed independently or in stages. > > > >> Agreed. Once some level of initial discussion clears things out at > > > least high level, I can start out more independent threads. > > > >> > > > >> Best, > > > >> Hwanju > > > >> > > > >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > > > >> > > > >> Hi Hwanju, > > > >> > > > >> Thanks for starting the discussion. Definitely any improvement in > > > this area would be very helpful and valuable. Generally speaking +1 > from > > my > > > side, as long as we make sure that either such changes do not add > > > performance overhead (which I think they shouldn’t) or they are > optional. > > > >> > > > >>> Firstly, we need to account time for each stage of task execution > > such > > > as scheduling, deploying, and running, to enable better visibility of > how > > > long a job takes in which stage while not running user functions. > > > >> > > > >> Couple of questions/remarks: > > > >> 1. Do we currently account state restore as “RUNNING”? If yes, > this > > > might be incorrect from your perspective. > > > >> 2a. This might be more tricky if various Tasks are in various > > stages. > > > For example in streaming, it should be safe to assume that state of the > > > job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > > > RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > > > >> 2b. However in batch - including DataStream jobs running against > > > bounded data streams, like Blink SQL - this might be more tricky, since > > > there are ongoing efforts to schedule part of the job graphs in stages. > > For > > > example do not schedule probe side of the join until build side is > > > done/completed. > > > >> > > > >>> Secondly, any downtime in each stage can be associated with a > failure > > > cause, which could be identified by Java exception notified to job > > manager > > > on task failure or unhealthy task manager (Flink already maintains a > > cause > > > but it can be associated with an execution stage for causal tracking) > > > >> > > > >> What exactly would you like to report here? List of exception with > > > downtime caused by it, for example: exception X caused a job to be down > > for > > > 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes > state > > > restore? > > > >> > > > >>> Thirdly, downtime reason should be classified into user- or > > > system-induced failure. This needs exception classifier by drawing the > > line > > > between user-defined functions (or public API) and Flink runtime — This > > is > > > particularly challenging to have 100% accuracy at one-shot due to > > empirical > > > nature and custom logic injection like serialization, so pluggable > > > classifier filters are must-have to enable incremental improvement. > > > >> > > > >> Why do you think about implementing classifiers? Couldn’t we > > classify > > > exceptions by exception type, like `FlinkUserException`, > > > `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > > that > > > we throw correct exception types + handle/wrap exceptions correctly > when > > > crossing Flink system/user code border? This way we could know exactly > > > whether exception occurred in the user code or in Flink code. > > > >> > > > >> One thing that might be tricky is if error in Flink code is caused > > by > > > user’s mistake. > > > >> > > > >> > > > >>> Fourthly, stuck progress > > > >> > > > >> Hmmm, this might be tricky. We can quite easily detect which exact > > > Task is causing back pressure in at least couple of different ways. > > Tricky > > > part would be to determine whether this is caused by user or not, but > > > probably some simple stack trace probing on back pressured task once > > every > > > N seconds should solve this - similar how sampling profilers work. > > > >> > > > >> Luckily it seems like those four issues/proposals could be > > > implemented/discussed independently or in stages. > > > >> > > > >> Piotrek > > > >> > > > >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email] > > > > > wrote: > > > >>> > > > >>> Hi, > > > >>> > > > >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a > > > discussion thread about a project we consider for Flink operational > > > improvement in production. We would like to start conversation early > > before > > > detailed design, so any high-level feedback would welcome. > > > >>> > > > >>> For service providers who operate Flink in a multi-tenant > > environment, > > > such as AWS Kinesis Data Analytics, it is crucial to measure > application > > > health and clearly differentiate application unavailability issue > caused > > by > > > Flink framework or service environment from the ones caused by > > application > > > code. The current metrics of Flink represent overall job availability > in > > > time, it still needs to be improved to give Flink operators better > > insight > > > for the detailed application availability. The current availability > > metrics > > > such as uptime and downtime measures the time based on the running > state > > of > > > a job, which does not necessarily represent actual running state of a > job > > > (after a job transitions to running, each task should still be > > > scheduled/deployed in order to run user-defined functions). The > detailed > > > view should enable operators to have visibility on 1) how long each > > > specific stage takes (e.g., task scheduling or deployment), 2) what > > failure > > > is introduced in which stage leading to job downtime, 3) whether such > > > failure is classified to user code error (e.g., uncaught exception from > > > user-defined function) or platform/environmental errors (e.g., > > > checkpointing issue, unhealthy nodes hosting job/task managers, Flink > > bug). > > > The last one is particularly needed to allow Flink operators to define > > SLA > > > where only a small fraction of downtime should be introduced by service > > > fault. All of these visibility enhancements can help community detect > and > > > fix Flink runtime issues quickly, whereby Flink can become more robust > > > operating system for hosting data analytics applications. > > > >>> > > > >>> The current proposal is as follows. Firstly, we need to account > time > > > for each stage of task execution such as scheduling, deploying, and > > > running, to enable better visibility of how long a job takes in which > > stage > > > while not running user functions. Secondly, any downtime in each stage > > can > > > be associated with a failure cause, which could be identified by Java > > > exception notified to job manager on task failure or unhealthy task > > manager > > > (Flink already maintains a cause but it can be associated with an > > execution > > > stage for causal tracking). Thirdly, downtime reason should be > classified > > > into user- or system-induced failure. This needs exception classifier > by > > > drawing the line between user-defined functions (or public API) and > Flink > > > runtime — This is particularly challenging to have 100% accuracy at > > > one-shot due to empirical nature and custom logic injection like > > > serialization, so pluggable classifier filters are must-have to enable > > > incremental improvement. Fourthly, stuck progress, where task is > > apparently > > > running but not being able to process data generally manifesting itself > > as > > > long backpressure, can be monitored as higher level job availability > and > > > the runtime can determine whether the reason to be stuck is caused by > > user > > > (e.g., under-provisioned resource, user function bug) or system > (deadlock > > > or livelock in Flink runtime). Finally, all the detailed tracking > > > information and metrics are exposed via REST and Flink metrics, so that > > > Flink dashboard can have enhanced information about job > > > execution/availability and operators can set alarm appropriately on > > metrics. > > > >>> > > > >>> Best, > > > >>> Hwanju > > > >>> > > > >> > > > >> > > > >> > > > > > > > > > > > > > > > > > > > > > |
In reply to this post by Hwanju Kim
Hi,
I am sharing the last doc, which is about progress monitor (a little deferred sharing by my vacation): https://docs.google.com/document/d/1Ov9A7V2tMs4uVimcSeHL5eftRJ3MCJBiVSFNdz8rmjU/edit?usp=sharing This last one seems like pretty independent from the first two (execution tracking and exception classifier), so it could be completely decoupled. We may want to leave this proposal, apart from the first two, more discussed here in dev list rather than diving deeper into implementation. (there are obviously more several challenging things) Thanks, Hwanju 2019년 5월 28일 (화) 오후 11:41, Hwanju Kim <[hidden email]>님이 작성: > (Somehow my email has failed to be sent multiple times, so I am using my > personal email account) > > Hi, > > Piotrek - Thanks for the feedback! I revised the doc as commented. > > Here's the second part about exception classification - > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing > I put cross-links between the first and the second. > > Thanks, > Hwanju > > 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <[hidden email]>님이 작성: > >> Hi Hwanju, >> >> I looked through the document, however I’m not the best person to >> review/judge/discuss about implementation details here. I hope that Chesney >> will be able to help in this regard. >> >> Piotrek >> >> > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> >> wrote: >> > >> > Hi, >> > >> > As suggested by Piotrek, the first part, execution state tracking, is >> now split to a separate doc: >> > >> https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing >> > >> > We'd appreciate any feedback. I am still using the same email thread to >> provide a full context, but please let me know if it's better to have a >> separate email thread as well. We will be sharing the remaining parts once >> ready. >> > >> > Thanks, >> > Hwanju >> > >> > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: >> > >> > Hi Hwanju & Chesney, >> > >> > Regarding various things that both of you mentioned, like accounting >> of state restoration separately or batch scheduling, we can always >> acknowledge some limitations of the initial approach and maybe we can >> address them later if we evaluate it worth the effort. >> > >> > Generally speaking all that you have written make sense to me, so +1 >> from my side to split the discussion into separate threads. >> > >> > Piotrek >> > >> >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> >> wrote: >> >> >> >> Hi Piotrek, >> >> >> >> Thanks for insightful feedback and indeed you got most tricky parts >> and concerns. >> >> >> >>> 1. Do we currently account state restore as “RUNNING”? If yes, this >> might be incorrect from your perspective. >> >> >> >> As Chesnay said, initializeState is called in StreamTask.invoke after >> transitioning to RUNNING. So, task state restore part is currently during >> RUNNING. I think accounting state restore as running seems fine, since >> state size is user's artifact, as long as we can detect service error >> during restore (indeed, DFS issue usually happens at >> createCheckpointStorage (e.g., S3 server error) and RocksDB issue happens >> at initializeState in StreamTask.invoke). We can discuss about the need to >> have separate state to track restore and running separately, but it seems >> to add too many messages in common paths just for tracking. >> >> >> >>> 2a. This might be more tricky if various Tasks are in various stages. >> For example in streaming, it should be safe to assume that state of the >> job, is “minimum” of it’s Tasks’ states, so Job should be accounted as >> RUNNING only if all of the Tasks are either RUNNING or COMPLETED. >> >> >> >> Right. For RUNNING, all the tasks in the graph transitions to RUNNING. >> For others, when the first task transitions to SCHEDULED, SCHEDULING stage >> begins, and when the first task transitions to DEPLOYING, it starts >> DEPLOYING stage. This would be fine especially for eager scheduling and >> full-restart fail-over strategy. In the individual or partial restart, we >> may not need to specifically track SCHEDULING and DEPLOYING states while >> treating job as running relying on progress monitor. >> >> >> >>> 2b. However in batch - including DataStream jobs running against >> bounded data streams, like Blink SQL - this might be more tricky, since >> there are ongoing efforts to schedule part of the job graphs in stages. For >> example do not schedule probe side of the join until build side is >> done/completed. >> >> >> >> Exactly. I have roughly looked at batch side, but not in detail yet >> and am aware of ongoing scheduling work. Initial focus of breaking out to >> multiple states like scheduling/deploying would be only for streaming with >> eager scheduling. Need to give more thought how to deal with batching/lazy >> scheduling. >> >> >> >>> What exactly would you like to report here? List of exception with >> downtime caused by it, for example: exception X caused a job to be down for >> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state >> restore? >> >> >> >> Basically, initial cause is traced back from each component of >> downtime, which is accounted to a certain type like user or system based on >> the classification. So you're right. Interesting part here is about >> secondary failure. For example, a user error causes a job to restart but >> then scheduling is failed by system issue. We need to account failing, >> restarting time to user, while scheduling time on restart (e.g,. 5min >> timeout) is to system. A further example is that a system error causes a >> job to be failing, but one of the user function is not reacting to >> cancellation (for full-restart), prolonged failing time (e.g., watchdog >> timeout 3min) shouldn’t be accounted to system (of course, the other way >> around has been seen -- e.g., FLINK-5463). >> >> >> >>> Why do you think about implementing classifiers? Couldn’t we classify >> exceptions by exception type, like `FlinkUserException`, >> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that >> we throw correct exception types + handle/wrap exceptions correctly when >> crossing Flink system/user code border? This way we could know exactly >> whether exception occurred in the user code or in Flink code. >> >> >> >> I think classifier here is complementary to exception type approach. >> In this context, classifier is "f(exception) -> type". Type is used as >> metric dimension to set alert on certain types or have downtime breakdown >> on each type (type is not just fixed to "user" or "system" but can be more >> specific and customizable like statebackend and network). If we do wrap >> exceptions perfectly as you said, f() is simple enough to look at Exception >> type and then return its corresponding type. >> >> >> >> Initially we also thought complete wrapping would be ideal. However, >> even inside UDF, it can call in Flink framework like state update or call >> out dependent services, which service provider may want to classify >> separately. In addition, Flink allows user to use lower level API like >> streamoperator to make the border a little blurring. Those would make >> complete wrapping challenging. Besides, stack-based classification beyond >> exception type could still be needed for stuck progress classification. >> >> >> >> Without instrumentation, one of base classifiers that work for our >> environment in many cases is user-class-loader classifier, which can detect >> if an exception is thrown from the class loaded from user JAR/artifact >> (although this may be less desirable in an environment where user's >> artifacts can be installed directly in system lib/, but service providers >> would be opting in self-contained jar submission keeping system environment >> for system-only). >> >> >> >>> One thing that might be tricky is if error in Flink code is caused >> by user’s mistake. >> >> >> >> Right, this is the trickiest part. Based on our analysis with real >> data, the most ambiguous ones are custom serialization and out-of-resource >> errors. The former is usually seen in Flink runtime code rather than in >> UDF. The latter is that Flink stack is just a victim by resource hog/leak >> of user code (OOM, too many open files). For the serialization issue, we've >> been looking at (and learning) various serialization errors seen in the >> field to get reasonable classification. For the out-of-resource, rather >> than user vs. system classification, we can tag the type as "resource" >> relying on dump (e.g., heap dump) and postmortem analysis as-needed basis. >> >> >> >>> Hmmm, this might be tricky. We can quite easily detect which exact >> Task is causing back pressure in at least couple of different ways. Tricky >> part would be to determine whether this is caused by user or not, but >> probably some simple stack trace probing on back pressured task once every >> N seconds should solve this - similar how sampling profilers work. >> >> >> >> Again you're right and like you said, this part would be mostly >> reusing the existing building blocks such as latency marker and >> backpressure samplings. If configured only with progress monitoring not >> latency distribution tracking, latency marker can be lightweight skipping >> histogram update part just updating latest timestamp with longer period not >> to adversely affect performance. Once stuck progress is detected, stack >> sampling can tell us more about the context that causes backpressure. >> >> >> >>> Luckily it seems like those four issues/proposals could be >> implemented/discussed independently or in stages. >> >> Agreed. Once some level of initial discussion clears things out at >> least high level, I can start out more independent threads. >> >> >> >> Best, >> >> Hwanju >> >> >> >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: >> >> >> >> Hi Hwanju, >> >> >> >> Thanks for starting the discussion. Definitely any improvement in >> this area would be very helpful and valuable. Generally speaking +1 from my >> side, as long as we make sure that either such changes do not add >> performance overhead (which I think they shouldn’t) or they are optional. >> >> >> >>> Firstly, we need to account time for each stage of task execution >> such as scheduling, deploying, and running, to enable better visibility of >> how long a job takes in which stage while not running user functions. >> >> >> >> Couple of questions/remarks: >> >> 1. Do we currently account state restore as “RUNNING”? If yes, this >> might be incorrect from your perspective. >> >> 2a. This might be more tricky if various Tasks are in various >> stages. For example in streaming, it should be safe to assume that state of >> the job, is “minimum” of it’s Tasks’ states, so Job should be accounted as >> RUNNING only if all of the Tasks are either RUNNING or COMPLETED. >> >> 2b. However in batch - including DataStream jobs running against >> bounded data streams, like Blink SQL - this might be more tricky, since >> there are ongoing efforts to schedule part of the job graphs in stages. For >> example do not schedule probe side of the join until build side is >> done/completed. >> >> >> >>> Secondly, any downtime in each stage can be associated with a failure >> cause, which could be identified by Java exception notified to job manager >> on task failure or unhealthy task manager (Flink already maintains a cause >> but it can be associated with an execution stage for causal tracking) >> >> >> >> What exactly would you like to report here? List of exception with >> downtime caused by it, for example: exception X caused a job to be down for >> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state >> restore? >> >> >> >>> Thirdly, downtime reason should be classified into user- or >> system-induced failure. This needs exception classifier by drawing the line >> between user-defined functions (or public API) and Flink runtime — This is >> particularly challenging to have 100% accuracy at one-shot due to empirical >> nature and custom logic injection like serialization, so pluggable >> classifier filters are must-have to enable incremental improvement. >> >> >> >> Why do you think about implementing classifiers? Couldn’t we >> classify exceptions by exception type, like `FlinkUserException`, >> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure that >> we throw correct exception types + handle/wrap exceptions correctly when >> crossing Flink system/user code border? This way we could know exactly >> whether exception occurred in the user code or in Flink code. >> >> >> >> One thing that might be tricky is if error in Flink code is caused >> by user’s mistake. >> >> >> >> >> >>> Fourthly, stuck progress >> >> >> >> Hmmm, this might be tricky. We can quite easily detect which exact >> Task is causing back pressure in at least couple of different ways. Tricky >> part would be to determine whether this is caused by user or not, but >> probably some simple stack trace probing on back pressured task once every >> N seconds should solve this - similar how sampling profilers work. >> >> >> >> Luckily it seems like those four issues/proposals could be >> implemented/discussed independently or in stages. >> >> >> >> Piotrek >> >> >> >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> >> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a >> discussion thread about a project we consider for Flink operational >> improvement in production. We would like to start conversation early before >> detailed design, so any high-level feedback would welcome. >> >>> >> >>> For service providers who operate Flink in a multi-tenant >> environment, such as AWS Kinesis Data Analytics, it is crucial to measure >> application health and clearly differentiate application unavailability >> issue caused by Flink framework or service environment from the ones caused >> by application code. The current metrics of Flink represent overall job >> availability in time, it still needs to be improved to give Flink operators >> better insight for the detailed application availability. The current >> availability metrics such as uptime and downtime measures the time based on >> the running state of a job, which does not necessarily represent actual >> running state of a job (after a job transitions to running, each task >> should still be scheduled/deployed in order to run user-defined functions). >> The detailed view should enable operators to have visibility on 1) how long >> each specific stage takes (e.g., task scheduling or deployment), 2) what >> failure is introduced in which stage leading to job downtime, 3) whether >> such failure is classified to user code error (e.g., uncaught exception >> from user-defined function) or platform/environmental errors (e.g., >> checkpointing issue, unhealthy nodes hosting job/task managers, Flink bug). >> The last one is particularly needed to allow Flink operators to define SLA >> where only a small fraction of downtime should be introduced by service >> fault. All of these visibility enhancements can help community detect and >> fix Flink runtime issues quickly, whereby Flink can become more robust >> operating system for hosting data analytics applications. >> >>> >> >>> The current proposal is as follows. Firstly, we need to account time >> for each stage of task execution such as scheduling, deploying, and >> running, to enable better visibility of how long a job takes in which stage >> while not running user functions. Secondly, any downtime in each stage can >> be associated with a failure cause, which could be identified by Java >> exception notified to job manager on task failure or unhealthy task manager >> (Flink already maintains a cause but it can be associated with an execution >> stage for causal tracking). Thirdly, downtime reason should be classified >> into user- or system-induced failure. This needs exception classifier by >> drawing the line between user-defined functions (or public API) and Flink >> runtime — This is particularly challenging to have 100% accuracy at >> one-shot due to empirical nature and custom logic injection like >> serialization, so pluggable classifier filters are must-have to enable >> incremental improvement. Fourthly, stuck progress, where task is apparently >> running but not being able to process data generally manifesting itself as >> long backpressure, can be monitored as higher level job availability and >> the runtime can determine whether the reason to be stuck is caused by user >> (e.g., under-provisioned resource, user function bug) or system (deadlock >> or livelock in Flink runtime). Finally, all the detailed tracking >> information and metrics are exposed via REST and Flink metrics, so that >> Flink dashboard can have enhanced information about job >> execution/availability and operators can set alarm appropriately on metrics. >> >>> >> >>> Best, >> >>> Hwanju >> >>> >> >> >> >> >> >> >> > >> > >> > >> >> |
Hi Hwanju, thanks for the reply.
Regarding the first two proposals, my main concern is whether it is necessary to have something deeply coupled with Flink runtime. To some extent, the SLA metrics are kind of custom metrics. It would be good if we can support custom metrics in general, instead of only for job state changes. I am wondering if something similar to following would meet your requirements. 1. Define an *Incident* interface in Flink - The incident may have multiple subclasses, e.g. job state change, checkpiont failure, and other user defined subclasses. 2. Introduce an IncidentListener interface, which is responsible for handling *incidents *in Flink. - Each IncidentListener is responsible for handling one type of *Incident*. - An IncidentListener will be provided a context to leverage some services in Flink. For now, we can just expose Metric reporting service. 3. Introduce an IncidentListenerRegistry to allow register IncidentListeners for an incident type. - Multiple IncidentListeners can be registered for each incident type. - When an incident occurs, the registered IncidentListeners will be invoked. - users may configure IncidentListeners in the configuration. The above mechanism has two benefits: 1. Users can provide arbitrary logic to handle incidents, including calculating SLA and so on. 2. In the future we can add more incident types to address other custom incident handling use cases. There might be some details to be sorted out, such as where should the incident handler run? JM or TM? But those are probably more of detail design and implementation choices. Regarding the 3rd doc, I think it is useful to introduce a progress monitoring mechanism. And inserting a in-flow control event also align with the fundamental design of Flink. So in general I think it is a good addition. One thing not quite clear to me is which part in the proposal is intended to be done inside Flink and which part might be built as an ecosystem project / pluggable? For example, if we reuse the mechanism above, can we do the following: 1. Introduce a *ProgressMonitoringIncident *in Flink * -* Each operator will timstamp the incident when the incident flows through the operator. Eventually, there will be N such incidents, where N is the total parallelism of the sink nodes. - The SinkOperator will invoke the ProgressMonitoringIncidentListener to handle all such incidents and perform corresponding analysis (maybe the ExecutionGraph is needed in the context). 2. Users may provide custom logic to analyze the progress. Thanks, Jiangjie (Becket) Qin On Wed, Jul 3, 2019 at 4:53 PM Hwanju Kim <[hidden email]> wrote: > Hi, > > I am sharing the last doc, which is about progress monitor (a little > deferred sharing by my vacation): > > https://docs.google.com/document/d/1Ov9A7V2tMs4uVimcSeHL5eftRJ3MCJBiVSFNdz8rmjU/edit?usp=sharing > > This last one seems like pretty independent from the first two (execution > tracking and exception classifier), so it could be completely decoupled. > We may want to leave this proposal, apart from the first two, more > discussed here in dev list rather than diving deeper into implementation. > (there are obviously more several challenging things) > > Thanks, > Hwanju > > > 2019년 5월 28일 (화) 오후 11:41, Hwanju Kim <[hidden email]>님이 작성: > > > (Somehow my email has failed to be sent multiple times, so I am using my > > personal email account) > > > > Hi, > > > > Piotrek - Thanks for the feedback! I revised the doc as commented. > > > > Here's the second part about exception classification - > > > https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit?usp=sharing > > I put cross-links between the first and the second. > > > > Thanks, > > Hwanju > > > > 2019년 5월 24일 (금) 오전 3:57, Piotr Nowojski <[hidden email]>님이 작성: > > > >> Hi Hwanju, > >> > >> I looked through the document, however I’m not the best person to > >> review/judge/discuss about implementation details here. I hope that > Chesney > >> will be able to help in this regard. > >> > >> Piotrek > >> > >> > On 24 May 2019, at 09:09, Kim, Hwanju <[hidden email]> > >> wrote: > >> > > >> > Hi, > >> > > >> > As suggested by Piotrek, the first part, execution state tracking, is > >> now split to a separate doc: > >> > > >> > https://docs.google.com/document/d/1oLF3w1wYyr8vqoFoQZhw1QxTofmAtlD8IF694oPLjNI/edit?usp=sharing > >> > > >> > We'd appreciate any feedback. I am still using the same email thread > to > >> provide a full context, but please let me know if it's better to have a > >> separate email thread as well. We will be sharing the remaining parts > once > >> ready. > >> > > >> > Thanks, > >> > Hwanju > >> > > >> > On 5/17/19, 12:59 AM, "Piotr Nowojski" <[hidden email]> wrote: > >> > > >> > Hi Hwanju & Chesney, > >> > > >> > Regarding various things that both of you mentioned, like > accounting > >> of state restoration separately or batch scheduling, we can always > >> acknowledge some limitations of the initial approach and maybe we can > >> address them later if we evaluate it worth the effort. > >> > > >> > Generally speaking all that you have written make sense to me, so > +1 > >> from my side to split the discussion into separate threads. > >> > > >> > Piotrek > >> > > >> >> On 17 May 2019, at 08:57, Kim, Hwanju <[hidden email]> > >> wrote: > >> >> > >> >> Hi Piotrek, > >> >> > >> >> Thanks for insightful feedback and indeed you got most tricky parts > >> and concerns. > >> >> > >> >>> 1. Do we currently account state restore as “RUNNING”? If yes, this > >> might be incorrect from your perspective. > >> >> > >> >> As Chesnay said, initializeState is called in StreamTask.invoke after > >> transitioning to RUNNING. So, task state restore part is currently > during > >> RUNNING. I think accounting state restore as running seems fine, since > >> state size is user's artifact, as long as we can detect service error > >> during restore (indeed, DFS issue usually happens at > >> createCheckpointStorage (e.g., S3 server error) and RocksDB issue > happens > >> at initializeState in StreamTask.invoke). We can discuss about the need > to > >> have separate state to track restore and running separately, but it > seems > >> to add too many messages in common paths just for tracking. > >> >> > >> >>> 2a. This might be more tricky if various Tasks are in various > stages. > >> For example in streaming, it should be safe to assume that state of the > >> job, is “minimum” of it’s Tasks’ states, so Job should be accounted as > >> RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> >> > >> >> Right. For RUNNING, all the tasks in the graph transitions to > RUNNING. > >> For others, when the first task transitions to SCHEDULED, SCHEDULING > stage > >> begins, and when the first task transitions to DEPLOYING, it starts > >> DEPLOYING stage. This would be fine especially for eager scheduling and > >> full-restart fail-over strategy. In the individual or partial restart, > we > >> may not need to specifically track SCHEDULING and DEPLOYING states while > >> treating job as running relying on progress monitor. > >> >> > >> >>> 2b. However in batch - including DataStream jobs running against > >> bounded data streams, like Blink SQL - this might be more tricky, since > >> there are ongoing efforts to schedule part of the job graphs in stages. > For > >> example do not schedule probe side of the join until build side is > >> done/completed. > >> >> > >> >> Exactly. I have roughly looked at batch side, but not in detail yet > >> and am aware of ongoing scheduling work. Initial focus of breaking out > to > >> multiple states like scheduling/deploying would be only for streaming > with > >> eager scheduling. Need to give more thought how to deal with > batching/lazy > >> scheduling. > >> >> > >> >>> What exactly would you like to report here? List of exception with > >> downtime caused by it, for example: exception X caused a job to be down > for > >> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > >> restore? > >> >> > >> >> Basically, initial cause is traced back from each component of > >> downtime, which is accounted to a certain type like user or system > based on > >> the classification. So you're right. Interesting part here is about > >> secondary failure. For example, a user error causes a job to restart but > >> then scheduling is failed by system issue. We need to account failing, > >> restarting time to user, while scheduling time on restart (e.g,. 5min > >> timeout) is to system. A further example is that a system error causes a > >> job to be failing, but one of the user function is not reacting to > >> cancellation (for full-restart), prolonged failing time (e.g., watchdog > >> timeout 3min) shouldn’t be accounted to system (of course, the other way > >> around has been seen -- e.g., FLINK-5463). > >> >> > >> >>> Why do you think about implementing classifiers? Couldn’t we > classify > >> exceptions by exception type, like `FlinkUserException`, > >> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > that > >> we throw correct exception types + handle/wrap exceptions correctly when > >> crossing Flink system/user code border? This way we could know exactly > >> whether exception occurred in the user code or in Flink code. > >> >> > >> >> I think classifier here is complementary to exception type approach. > >> In this context, classifier is "f(exception) -> type". Type is used as > >> metric dimension to set alert on certain types or have downtime > breakdown > >> on each type (type is not just fixed to "user" or "system" but can be > more > >> specific and customizable like statebackend and network). If we do wrap > >> exceptions perfectly as you said, f() is simple enough to look at > Exception > >> type and then return its corresponding type. > >> >> > >> >> Initially we also thought complete wrapping would be ideal. However, > >> even inside UDF, it can call in Flink framework like state update or > call > >> out dependent services, which service provider may want to classify > >> separately. In addition, Flink allows user to use lower level API like > >> streamoperator to make the border a little blurring. Those would make > >> complete wrapping challenging. Besides, stack-based classification > beyond > >> exception type could still be needed for stuck progress classification. > >> >> > >> >> Without instrumentation, one of base classifiers that work for our > >> environment in many cases is user-class-loader classifier, which can > detect > >> if an exception is thrown from the class loaded from user JAR/artifact > >> (although this may be less desirable in an environment where user's > >> artifacts can be installed directly in system lib/, but service > providers > >> would be opting in self-contained jar submission keeping system > environment > >> for system-only). > >> >> > >> >>> One thing that might be tricky is if error in Flink code is caused > >> by user’s mistake. > >> >> > >> >> Right, this is the trickiest part. Based on our analysis with real > >> data, the most ambiguous ones are custom serialization and > out-of-resource > >> errors. The former is usually seen in Flink runtime code rather than in > >> UDF. The latter is that Flink stack is just a victim by resource > hog/leak > >> of user code (OOM, too many open files). For the serialization issue, > we've > >> been looking at (and learning) various serialization errors seen in the > >> field to get reasonable classification. For the out-of-resource, rather > >> than user vs. system classification, we can tag the type as "resource" > >> relying on dump (e.g., heap dump) and postmortem analysis as-needed > basis. > >> >> > >> >>> Hmmm, this might be tricky. We can quite easily detect which exact > >> Task is causing back pressure in at least couple of different ways. > Tricky > >> part would be to determine whether this is caused by user or not, but > >> probably some simple stack trace probing on back pressured task once > every > >> N seconds should solve this - similar how sampling profilers work. > >> >> > >> >> Again you're right and like you said, this part would be mostly > >> reusing the existing building blocks such as latency marker and > >> backpressure samplings. If configured only with progress monitoring not > >> latency distribution tracking, latency marker can be lightweight > skipping > >> histogram update part just updating latest timestamp with longer period > not > >> to adversely affect performance. Once stuck progress is detected, stack > >> sampling can tell us more about the context that causes backpressure. > >> >> > >> >>> Luckily it seems like those four issues/proposals could be > >> implemented/discussed independently or in stages. > >> >> Agreed. Once some level of initial discussion clears things out at > >> least high level, I can start out more independent threads. > >> >> > >> >> Best, > >> >> Hwanju > >> >> > >> >> On 5/16/19, 2:44 AM, "Piotr Nowojski" <[hidden email]> wrote: > >> >> > >> >> Hi Hwanju, > >> >> > >> >> Thanks for starting the discussion. Definitely any improvement in > >> this area would be very helpful and valuable. Generally speaking +1 > from my > >> side, as long as we make sure that either such changes do not add > >> performance overhead (which I think they shouldn’t) or they are > optional. > >> >> > >> >>> Firstly, we need to account time for each stage of task execution > >> such as scheduling, deploying, and running, to enable better visibility > of > >> how long a job takes in which stage while not running user functions. > >> >> > >> >> Couple of questions/remarks: > >> >> 1. Do we currently account state restore as “RUNNING”? If yes, this > >> might be incorrect from your perspective. > >> >> 2a. This might be more tricky if various Tasks are in various > >> stages. For example in streaming, it should be safe to assume that > state of > >> the job, is “minimum” of it’s Tasks’ states, so Job should be accounted > as > >> RUNNING only if all of the Tasks are either RUNNING or COMPLETED. > >> >> 2b. However in batch - including DataStream jobs running against > >> bounded data streams, like Blink SQL - this might be more tricky, since > >> there are ongoing efforts to schedule part of the job graphs in stages. > For > >> example do not schedule probe side of the join until build side is > >> done/completed. > >> >> > >> >>> Secondly, any downtime in each stage can be associated with a > failure > >> cause, which could be identified by Java exception notified to job > manager > >> on task failure or unhealthy task manager (Flink already maintains a > cause > >> but it can be associated with an execution stage for causal tracking) > >> >> > >> >> What exactly would you like to report here? List of exception with > >> downtime caused by it, for example: exception X caused a job to be down > for > >> 13 minutes, 1 minute in scheduling, 1 minute deploying, 11 minutes state > >> restore? > >> >> > >> >>> Thirdly, downtime reason should be classified into user- or > >> system-induced failure. This needs exception classifier by drawing the > line > >> between user-defined functions (or public API) and Flink runtime — This > is > >> particularly challenging to have 100% accuracy at one-shot due to > empirical > >> nature and custom logic injection like serialization, so pluggable > >> classifier filters are must-have to enable incremental improvement. > >> >> > >> >> Why do you think about implementing classifiers? Couldn’t we > >> classify exceptions by exception type, like `FlinkUserException`, > >> `FlinkNetworkException`, `FlinkStateBackendException` … and make sure > that > >> we throw correct exception types + handle/wrap exceptions correctly when > >> crossing Flink system/user code border? This way we could know exactly > >> whether exception occurred in the user code or in Flink code. > >> >> > >> >> One thing that might be tricky is if error in Flink code is caused > >> by user’s mistake. > >> >> > >> >> > >> >>> Fourthly, stuck progress > >> >> > >> >> Hmmm, this might be tricky. We can quite easily detect which exact > >> Task is causing back pressure in at least couple of different ways. > Tricky > >> part would be to determine whether this is caused by user or not, but > >> probably some simple stack trace probing on back pressured task once > every > >> N seconds should solve this - similar how sampling profilers work. > >> >> > >> >> Luckily it seems like those four issues/proposals could be > >> implemented/discussed independently or in stages. > >> >> > >> >> Piotrek > >> >> > >> >>> On 11 May 2019, at 06:50, Kim, Hwanju <[hidden email]> > >> wrote: > >> >>> > >> >>> Hi, > >> >>> > >> >>> I am Hwanju at AWS Kinesis Analytics. We would like to start a > >> discussion thread about a project we consider for Flink operational > >> improvement in production. We would like to start conversation early > before > >> detailed design, so any high-level feedback would welcome. > >> >>> > >> >>> For service providers who operate Flink in a multi-tenant > >> environment, such as AWS Kinesis Data Analytics, it is crucial to > measure > >> application health and clearly differentiate application unavailability > >> issue caused by Flink framework or service environment from the ones > caused > >> by application code. The current metrics of Flink represent overall job > >> availability in time, it still needs to be improved to give Flink > operators > >> better insight for the detailed application availability. The current > >> availability metrics such as uptime and downtime measures the time > based on > >> the running state of a job, which does not necessarily represent actual > >> running state of a job (after a job transitions to running, each task > >> should still be scheduled/deployed in order to run user-defined > functions). > >> The detailed view should enable operators to have visibility on 1) how > long > >> each specific stage takes (e.g., task scheduling or deployment), 2) what > >> failure is introduced in which stage leading to job downtime, 3) whether > >> such failure is classified to user code error (e.g., uncaught exception > >> from user-defined function) or platform/environmental errors (e.g., > >> checkpointing issue, unhealthy nodes hosting job/task managers, Flink > bug). > >> The last one is particularly needed to allow Flink operators to define > SLA > >> where only a small fraction of downtime should be introduced by service > >> fault. All of these visibility enhancements can help community detect > and > >> fix Flink runtime issues quickly, whereby Flink can become more robust > >> operating system for hosting data analytics applications. > >> >>> > >> >>> The current proposal is as follows. Firstly, we need to account time > >> for each stage of task execution such as scheduling, deploying, and > >> running, to enable better visibility of how long a job takes in which > stage > >> while not running user functions. Secondly, any downtime in each stage > can > >> be associated with a failure cause, which could be identified by Java > >> exception notified to job manager on task failure or unhealthy task > manager > >> (Flink already maintains a cause but it can be associated with an > execution > >> stage for causal tracking). Thirdly, downtime reason should be > classified > >> into user- or system-induced failure. This needs exception classifier by > >> drawing the line between user-defined functions (or public API) and > Flink > >> runtime — This is particularly challenging to have 100% accuracy at > >> one-shot due to empirical nature and custom logic injection like > >> serialization, so pluggable classifier filters are must-have to enable > >> incremental improvement. Fourthly, stuck progress, where task is > apparently > >> running but not being able to process data generally manifesting itself > as > >> long backpressure, can be monitored as higher level job availability and > >> the runtime can determine whether the reason to be stuck is caused by > user > >> (e.g., under-provisioned resource, user function bug) or system > (deadlock > >> or livelock in Flink runtime). Finally, all the detailed tracking > >> information and metrics are exposed via REST and Flink metrics, so that > >> Flink dashboard can have enhanced information about job > >> execution/availability and operators can set alarm appropriately on > metrics. > >> >>> > >> >>> Best, > >> >>> Hwanju > >> >>> > >> >> > >> >> > >> >> > >> > > >> > > >> > > >> > >> > |
Free forum by Nabble | Edit this page |