Thanks for raising the concern @shuyi and the explanation @konstantin.
Upon glancing on the Flink document, it seems like user have full control on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not straightforward to access the internal state of the operator to, for example, put the message back to the async buffer with a retry tag. Thus, I also think that giving a set of common timeout handling seems to be a good idea for Flink users and this could be very useful feature. Regarding the questions and concerns 1. should the "retry counter" to be reset or to continue where it left off? - This is definitely a good point as this counter might need to go into the operator state if we decided to carry over the retry counter. Functionality-wise I think this should be reset because it doesn't represent the same transient state at the time of failure once restart. 2. When should AsyncStream.orderedWait() skip a record? - This should be configurable by user I am assuming, for example we can have additional properties for each strategy described by @shuyi like a combination of: - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) I've also created a JIRA ticket [2] for the discussion, please feel free to share your thoughts and comments. -- Rong [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling [2] https://issues.apache.org/jira/browse/FLINK-11909 On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> wrote: > Hi Shuyi, > > I am not sure. You could handle retries in the user code within > org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke > without using a DLQ as described in my original answer to William. On the > other hand, I agree that it could easier for the user and it is indeed a > common scenario. > > Two follow up questions come to mind: > > - When a Flink job fails and restarts, would you expect the "retry > counter" to be reset or to continue where it left off? > - With AsyncStream.orderedWait() when would you expect a record to be > skipped? After the final timeout, after the first timeout? > > Would you like to create a JIRA ticket [1] for this improvement with > answers to the questions above and we can continue to discuss it there. > > Best, > > Konstantin > > [1] > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues > > > On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote: > >> Hi Konstantin, >> >> (cc Till since he owns the code) >> >> For async-IO, IO failure and retry is a common & expected pattern. In >> most of the use cases, users will need to deal with IO failure and retry. >> Therefore, I think it's better to address the problem in Flink rather than >> user implementing its custom logic in user code for a better dev >> experience. We do have similar problem in many of our use cases. To enable >> backoff and retry, we need to put the failed message to a DLQ (another >> Kafka topic) and re-ingest the message from the DLQ topic to retry, which >> is manual/cumbersome and require setting up extra Kafka topic. >> >> Can we add multiple strategies to handle async IO failure in the >> AsyncWaitOperator? I propose the following strategies: >> >> >> - FAIL_OPERATOR (default & current behavior) >> - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N >> times) >> - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) >> >> What do you guys think? Thanks a lot. >> >> Shuyi >> >> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <[hidden email]> >> wrote: >> >>> Hi William, >>> >>> the AsyncOperator does not have such a setting. It is "merely" a wrapper >>> around an asynchronous call, which provides integration with Flink's state >>> & time management. >>> >>> I think, the way to go would be to do the exponential back-off in the >>> user code and set the timeout of the AsyncOperator to the sum of the >>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >>> >>> Cheers, >>> >>> Konstantin >>> >>> >>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote: >>> >>>> Hi, >>>> Is there a way to specify an exponential backoff strategy for when >>>> async function calls fail? >>>> >>>> I have an async function that does web requests to a rate-limited API. >>>> Can you handle that with settings on the async function call? >>>> >>>> Thanks, >>>> William >>>> >>>> >>>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Data Artisans GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >> > > -- > > Konstantin Knauf | Solutions Architect > > +49 160 91394525 > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > |
Sorry for joining the discussion so late. I agree that we could add some
more syntactic sugar for handling failure cases. Looking at the existing interfaces, I think it should be fairly easy to create an abstract class AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the retry logic for asynchronous operations. I think it is not strictly necessary to change the AsyncWaitOperator to add this functionality. Cheers, Till On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <[hidden email]> wrote: > Thanks for raising the concern @shuyi and the explanation @konstantin. > > Upon glancing on the Flink document, it seems like user have full control > on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not > straightforward to access the internal state of the operator to, for > example, put the message back to the async buffer with a retry tag. Thus, I > also think that giving a set of common timeout handling seems to be a good > idea for Flink users and this could be very useful feature. > > Regarding the questions and concerns > 1. should the "retry counter" to be reset or to continue where it left > off? > - This is definitely a good point as this counter might need to go into > the operator state if we decided to carry over the retry counter. > Functionality-wise I think this should be reset because it doesn't > represent the same transient state at the time of failure once restart. > > 2. When should AsyncStream.orderedWait() skip a record? > - This should be configurable by user I am assuming, for example we can > have additional properties for each strategy described by @shuyi like a > combination of: > - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) > > I've also created a JIRA ticket [2] for the discussion, please feel free > to share your thoughts and comments. > > -- > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] https://issues.apache.org/jira/browse/FLINK-11909 > > > > On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <[hidden email]> > wrote: > >> Hi Shuyi, >> >> I am not sure. You could handle retries in the user code within >> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke >> without using a DLQ as described in my original answer to William. On the >> other hand, I agree that it could easier for the user and it is indeed a >> common scenario. >> >> Two follow up questions come to mind: >> >> - When a Flink job fails and restarts, would you expect the "retry >> counter" to be reset or to continue where it left off? >> - With AsyncStream.orderedWait() when would you expect a record to be >> skipped? After the final timeout, after the first timeout? >> >> Would you like to create a JIRA ticket [1] for this improvement with >> answers to the questions above and we can continue to discuss it there. >> >> Best, >> >> Konstantin >> >> [1] >> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues >> >> >> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote: >> >>> Hi Konstantin, >>> >>> (cc Till since he owns the code) >>> >>> For async-IO, IO failure and retry is a common & expected pattern. In >>> most of the use cases, users will need to deal with IO failure and retry. >>> Therefore, I think it's better to address the problem in Flink rather than >>> user implementing its custom logic in user code for a better dev >>> experience. We do have similar problem in many of our use cases. To enable >>> backoff and retry, we need to put the failed message to a DLQ (another >>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which >>> is manual/cumbersome and require setting up extra Kafka topic. >>> >>> Can we add multiple strategies to handle async IO failure in the >>> AsyncWaitOperator? I propose the following strategies: >>> >>> >>> - FAIL_OPERATOR (default & current behavior) >>> - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N >>> times) >>> - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) >>> >>> What do you guys think? Thanks a lot. >>> >>> Shuyi >>> >>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf < >>> [hidden email]> wrote: >>> >>>> Hi William, >>>> >>>> the AsyncOperator does not have such a setting. It is "merely" a >>>> wrapper around an asynchronous call, which provides integration with >>>> Flink's state & time management. >>>> >>>> I think, the way to go would be to do the exponential back-off in the >>>> user code and set the timeout of the AsyncOperator to the sum of the >>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> >>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote: >>>> >>>>> Hi, >>>>> Is there a way to specify an exponential backoff strategy for when >>>>> async function calls fail? >>>>> >>>>> I have an async function that does web requests to a rate-limited API. >>>>> Can you handle that with settings on the async function call? >>>>> >>>>> Thanks, >>>>> William >>>>> >>>>> >>>>> >>>> >>>> -- >>>> >>>> Konstantin Knauf | Solutions Architect >>>> >>>> +49 160 91394525 >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Data Artisans GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> > |
Thanks for the feedback @Till.
Yes I agree as well that opening up or changing the AsyncWaitOperator doesn't seem to be a necessity here. I think making "AsyncFunctionBase", making the current AsyncFunction as a extension of it with a some of the default behaviors like Shuyi suggested seems to be a good starting point. To some extend we can also provide some of these strategies discussed as default building blocks but I am not sure this is a must once we have the "AsyncFunctionBase". I would try to create a POC for the change and gather some feedbacks and see if the abstract class contains too much or too little flexibilities. Best, Rong On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann <[hidden email]> wrote: > Sorry for joining the discussion so late. I agree that we could add some > more syntactic sugar for handling failure cases. Looking at the existing > interfaces, I think it should be fairly easy to create an abstract class > AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the > retry logic for asynchronous operations. I think it is not strictly > necessary to change the AsyncWaitOperator to add this functionality. > > Cheers, > Till > > On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <[hidden email]> wrote: > >> Thanks for raising the concern @shuyi and the explanation @konstantin. >> >> Upon glancing on the Flink document, it seems like user have full control >> on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not >> straightforward to access the internal state of the operator to, for >> example, put the message back to the async buffer with a retry tag. Thus, I >> also think that giving a set of common timeout handling seems to be a good >> idea for Flink users and this could be very useful feature. >> >> Regarding the questions and concerns >> 1. should the "retry counter" to be reset or to continue where it left >> off? >> - This is definitely a good point as this counter might need to go into >> the operator state if we decided to carry over the retry counter. >> Functionality-wise I think this should be reset because it doesn't >> represent the same transient state at the time of failure once restart. >> >> 2. When should AsyncStream.orderedWait() skip a record? >> - This should be configurable by user I am assuming, for example we can >> have additional properties for each strategy described by @shuyi like a >> combination of: >> - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) >> >> I've also created a JIRA ticket [2] for the discussion, please feel free >> to share your thoughts and comments. >> >> -- >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling >> [2] https://issues.apache.org/jira/browse/FLINK-11909 >> >> >> >> On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf < >> [hidden email]> wrote: >> >>> Hi Shuyi, >>> >>> I am not sure. You could handle retries in the user code within >>> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke >>> without using a DLQ as described in my original answer to William. On the >>> other hand, I agree that it could easier for the user and it is indeed a >>> common scenario. >>> >>> Two follow up questions come to mind: >>> >>> - When a Flink job fails and restarts, would you expect the "retry >>> counter" to be reset or to continue where it left off? >>> - With AsyncStream.orderedWait() when would you expect a record to >>> be skipped? After the final timeout, after the first timeout? >>> >>> Would you like to create a JIRA ticket [1] for this improvement with >>> answers to the questions above and we can continue to discuss it there. >>> >>> Best, >>> >>> Konstantin >>> >>> [1] >>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues >>> >>> >>> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <[hidden email]> wrote: >>> >>>> Hi Konstantin, >>>> >>>> (cc Till since he owns the code) >>>> >>>> For async-IO, IO failure and retry is a common & expected pattern. In >>>> most of the use cases, users will need to deal with IO failure and retry. >>>> Therefore, I think it's better to address the problem in Flink rather than >>>> user implementing its custom logic in user code for a better dev >>>> experience. We do have similar problem in many of our use cases. To enable >>>> backoff and retry, we need to put the failed message to a DLQ (another >>>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which >>>> is manual/cumbersome and require setting up extra Kafka topic. >>>> >>>> Can we add multiple strategies to handle async IO failure in the >>>> AsyncWaitOperator? I propose the following strategies: >>>> >>>> >>>> - FAIL_OPERATOR (default & current behavior) >>>> - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to >>>> N times) >>>> - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) >>>> >>>> What do you guys think? Thanks a lot. >>>> >>>> Shuyi >>>> >>>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf < >>>> [hidden email]> wrote: >>>> >>>>> Hi William, >>>>> >>>>> the AsyncOperator does not have such a setting. It is "merely" a >>>>> wrapper around an asynchronous call, which provides integration with >>>>> Flink's state & time management. >>>>> >>>>> I think, the way to go would be to do the exponential back-off in the >>>>> user code and set the timeout of the AsyncOperator to the sum of the >>>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >>>>> >>>>> Cheers, >>>>> >>>>> Konstantin >>>>> >>>>> >>>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <[hidden email]> wrote: >>>>> >>>>>> Hi, >>>>>> Is there a way to specify an exponential backoff strategy for when >>>>>> async function calls fail? >>>>>> >>>>>> I have an async function that does web requests to a rate-limited >>>>>> API. Can you handle that with settings on the async function call? >>>>>> >>>>>> Thanks, >>>>>> William >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Konstantin Knauf | Solutions Architect >>>>> >>>>> +49 160 91394525 >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Data Artisans GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>> >>>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Data Artisans GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >> |
Free forum by Nabble | Edit this page |