Kinesis getRecords read timeout and retry

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Kinesis getRecords read timeout and retry

Thomas Weise
Hi,

I’m working on implementing retry for getRecords in FlinkKinesisConsumer.
We occasionally get transient socket read timeouts. Instead of bubbling up
the exception and forcing a topology reset to checkpoint, we want to retry
getRecords. We also want to work with a lower socket read timeout than the
50s default.

Looking at the current KinesisProxy implementation, I’m aiming to remove
some baked in assumptions that get into the way of customizing this:

1) AWSUtil.createKinesisClient - statically wired to use default
ClientConfiguration. The user should be able to control all settings that
the SDK exposes instead.

2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
Perhaps it is OK as default, but the user should be able to retry on other
exceptions if desired.

For 1) a generic option could be to set properties on ClientConfiguration
using reflection (the class isn’t serializable but follows the Java Bean
conventions). Something like BeanUtils would make it straightforward to
process user supplied properties with a specific prefix. Is there any other
place in the Flink codebase where this style of configuration approach is
used and a preferred alternative to BeanUtils?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis getRecords read timeout and retry

Thomas Weise
PR to provide the hooks:  https://github.com/apache/flink/pull/5803


On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <[hidden email]> wrote:

> Hi,
>
> I’m working on implementing retry for getRecords in FlinkKinesisConsumer.
> We occasionally get transient socket read timeouts. Instead of bubbling up
> the exception and forcing a topology reset to checkpoint, we want to retry
> getRecords. We also want to work with a lower socket read timeout than the
> 50s default.
>
> Looking at the current KinesisProxy implementation, I’m aiming to remove
> some baked in assumptions that get into the way of customizing this:
>
> 1) AWSUtil.createKinesisClient - statically wired to use default
> ClientConfiguration. The user should be able to control all settings that
> the SDK exposes instead.
>
> 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
> Perhaps it is OK as default, but the user should be able to retry on other
> exceptions if desired.
>
> For 1) a generic option could be to set properties on ClientConfiguration
> using reflection (the class isn’t serializable but follows the Java Bean
> conventions). Something like BeanUtils would make it straightforward to
> process user supplied properties with a specific prefix. Is there any other
> place in the Flink codebase where this style of configuration approach is
> used and a preferred alternative to BeanUtils?
>
> Thanks
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis getRecords read timeout and retry

Tzu-Li (Gordon) Tai
Hi Thomas,

Thanks for your PRs!

I understand and fully agree with both points that you pointed out.

What I'm still a bit torn with is the current proposed solutions for these
issues (and other similar connector issues).

This might actually call for a good opportunity to bring some thoughts up
about connector contributions. My arguments would be the following:

The solutions actually break some fundamental designs of the connector code.
For example, in recent PRs for the Kinesis connector we've been proposing
to relax access of the `KinesisProxy` constructor.
AFAIK, this fix was triggered by an inefficiency in the
`KinesisProxy#getShardsOfStream` method which influences shard discovery
performance.
First of all, such a change breaks the fact that the class is an internal
class (it is marked as @Internal). It was made private as it handles
critical paths such as record fetching and shard listing, and is not
intended to be modified at all.
Second of all, the fix in the end did not fix the inefficiency at all -
only for advanced users who perhaps have saw the corresponding JIRA and
would bother to do the same and override the inefficient implementations by
themselves.
If there is a fix that would have benefited all users of the connector in
general, I would definitely be more in favor of that.
This goes the same for https://github.com/apache/flink/pull/5803 - I'm not
sure that allowing overrides on the retry logic is ideal. For example, we
previously introduced in the Elasticsearch connector a RetryHandler
user-facing API to allow such customizations.

On one hand, I do understand that solving these connector issues properly
would perhaps require a more thorough design-wise ground-work and could be
more time-consuming.
On the other hand, I also understand that we need to find a good balance to
allow production users of these connectors to be able to quickly iterate
what issues the current code has and unblock encountered problems.

My main concern is that our current approach to fixing these issues, IMO,
actually do not encourage good fixes to be contributed back to the
connector code, and they would therefore remain problematic as they are.

What do you think? I may also be missing thing in a bigger picture here, so
feedback would be highly appreciated.

Cheers,
Gordon

On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <[hidden email]> wrote:

> PR to provide the hooks:  https://github.com/apache/flink/pull/5803
>
>
> On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <[hidden email]> wrote:
>
> > Hi,
> >
> > I’m working on implementing retry for getRecords in FlinkKinesisConsumer.
> > We occasionally get transient socket read timeouts. Instead of bubbling
> up
> > the exception and forcing a topology reset to checkpoint, we want to
> retry
> > getRecords. We also want to work with a lower socket read timeout than
> the
> > 50s default.
> >
> > Looking at the current KinesisProxy implementation, I’m aiming to remove
> > some baked in assumptions that get into the way of customizing this:
> >
> > 1) AWSUtil.createKinesisClient - statically wired to use default
> > ClientConfiguration. The user should be able to control all settings that
> > the SDK exposes instead.
> >
> > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
> > Perhaps it is OK as default, but the user should be able to retry on
> other
> > exceptions if desired.
> >
> > For 1) a generic option could be to set properties on ClientConfiguration
> > using reflection (the class isn’t serializable but follows the Java Bean
> > conventions). Something like BeanUtils would make it straightforward to
> > process user supplied properties with a specific prefix. Is there any
> other
> > place in the Flink codebase where this style of configuration approach is
> > used and a preferred alternative to BeanUtils?
> >
> > Thanks
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis getRecords read timeout and retry

Thomas Weise
Hi Gordon,

This is indeed a discussion necessary to have!

The purpose of previous PRs wasn't to provide solutions to the original
identified issues, but rather to enable solve those through customization.
What those customizations would be was also communicated, along with the
intent to contribute them subsequently as well, if they are deemed broadly
enough applicable and we find a reasonable contribution path.

So far we have implemented the following in custom code:

* use ListShards for discovery (plus also considered to limit the discovery
to a single subtask and share the results between subtasks, which is almost
certainly not something I would propose to add to Flink due to additional
deployment dependencies).

* override emitRecords in the fetcher to provide source watermarking with
idle shard handling. Related discussions for the Kafka consumer show that
it isn't straightforward to arrive at a solution that will satisfy
everyone. Still open to contribute those changes also, but had not seen a
response to that. Nevertheless, it is key to allow users to implement what
they need for their use case.

* retry certain exceptions in getRecords based on our production learnings.
Whether or not those are applicable to everyone and the Flink
implementation should be changed to retry by default is actually a future
discussion I'm intending to start. But in any case, we need to be able to
make the changes that we need on our end.

* ability to configure the AWS HTTP client when defaults turn out
unsuitable for the use case. This is a very basic requirement and it is
rather surprising that the Flink Kinesis consumer wasn't written to provide
access to the settings that the AWS SDK provides.

I hope above examples make clear that it is necessary to leave room for
users to augment a base implementation. There is no such thing as a perfect
connector and there will always be new discoveries by users that require
improvements or changes. Use case specific considerations may require to
augment the even best default behavior, what works for one user may not
work for another.

If I don't have the hooks that referenced PRs enable, then the alternative
is to fork the code. That will further reduce the likelihood of changes
making their way back to Flink.

I think we agree in the ultimate goal of improving the default
implementation of the connector. There are more fundamental issues with the
Kinesis connector (and other connectors) that I believe require deeper
design work and rewrite, which go beyond what we discuss here.

Finally, I'm also curious how much appetite for contributions in the
connector areas there is? I see that we have now accumulated 340 open PRs,
and review bandwidth seems hard to come by.

Thanks,
Thomas


On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Thomas,
>
> Thanks for your PRs!
>
> I understand and fully agree with both points that you pointed out.
>
> What I'm still a bit torn with is the current proposed solutions for these
> issues (and other similar connector issues).
>
> This might actually call for a good opportunity to bring some thoughts up
> about connector contributions. My arguments would be the following:
>
> The solutions actually break some fundamental designs of the connector
> code.
> For example, in recent PRs for the Kinesis connector we've been proposing
> to relax access of the `KinesisProxy` constructor.
> AFAIK, this fix was triggered by an inefficiency in the
> `KinesisProxy#getShardsOfStream` method which influences shard discovery
> performance.
> First of all, such a change breaks the fact that the class is an internal
> class (it is marked as @Internal). It was made private as it handles
> critical paths such as record fetching and shard listing, and is not
> intended to be modified at all.
> Second of all, the fix in the end did not fix the inefficiency at all -
> only for advanced users who perhaps have saw the corresponding JIRA and
> would bother to do the same and override the inefficient implementations by
> themselves.
> If there is a fix that would have benefited all users of the connector in
> general, I would definitely be more in favor of that.
> This goes the same for https://github.com/apache/flink/pull/5803 - I'm not
> sure that allowing overrides on the retry logic is ideal. For example, we
> previously introduced in the Elasticsearch connector a RetryHandler
> user-facing API to allow such customizations.
>
> On one hand, I do understand that solving these connector issues properly
> would perhaps require a more thorough design-wise ground-work and could be
> more time-consuming.
> On the other hand, I also understand that we need to find a good balance to
> allow production users of these connectors to be able to quickly iterate
> what issues the current code has and unblock encountered problems.
>
> My main concern is that our current approach to fixing these issues, IMO,
> actually do not encourage good fixes to be contributed back to the
> connector code, and they would therefore remain problematic as they are.
>
> What do you think? I may also be missing thing in a bigger picture here, so
> feedback would be highly appreciated.
>
> Cheers,
> Gordon
>
> On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <[hidden email]> wrote:
>
> > PR to provide the hooks:  https://github.com/apache/flink/pull/5803
> >
> >
> > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > I’m working on implementing retry for getRecords in
> FlinkKinesisConsumer.
> > > We occasionally get transient socket read timeouts. Instead of bubbling
> > up
> > > the exception and forcing a topology reset to checkpoint, we want to
> > retry
> > > getRecords. We also want to work with a lower socket read timeout than
> > the
> > > 50s default.
> > >
> > > Looking at the current KinesisProxy implementation, I’m aiming to
> remove
> > > some baked in assumptions that get into the way of customizing this:
> > >
> > > 1) AWSUtil.createKinesisClient - statically wired to use default
> > > ClientConfiguration. The user should be able to control all settings
> that
> > > the SDK exposes instead.
> > >
> > > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
> > > Perhaps it is OK as default, but the user should be able to retry on
> > other
> > > exceptions if desired.
> > >
> > > For 1) a generic option could be to set properties on
> ClientConfiguration
> > > using reflection (the class isn’t serializable but follows the Java
> Bean
> > > conventions). Something like BeanUtils would make it straightforward to
> > > process user supplied properties with a specific prefix. Is there any
> > other
> > > place in the Flink codebase where this style of configuration approach
> is
> > > used and a preferred alternative to BeanUtils?
> > >
> > > Thanks
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis getRecords read timeout and retry

Tzu-Li (Gordon) Tai
Hi Thomas,

I see. If exposing access for these internal classes is a must to enable
further contributions, then I would agree to do so.
I think in the future, we should also keep a closer eye on parts of the
connector code which is highly subject to modifications on a
per-environment basis and keep flexibility in mind as the base assumption
(as you stated very well, there is no "perfect" implementation for a
connector, even with best default implementations).

Some comments on the specific issues that you mentioned:

* use ListShards for discovery (plus also considered to limit the discovery
> to a single subtask and share the results between subtasks, which is almost
> certainly not something I would propose to add to Flink due to additional
> deployment dependencies).


I think this one is most likely a direct improvement to the connector
already (minus the inter-subtask coordination).
The shard discovery method does not use other information from the
`desribeStreams` call, so the alternate API should be a direct replacement.

  * ability to configure the AWS HTTP client when defaults turn out
> unsuitable for the use case. This is a very basic requirement and it is
> rather surprising that the Flink Kinesis consumer wasn't written to provide
> access to the settings that the AWS SDK provides.


I see you have opened a separate JIRA for this (FLINK-9188). And yes, IMO
this is definitely something very desirable in the future.

Finally, I'm also curious how much appetite for contributions in the
> connector areas there is? I see that we have now accumulated 340 open PRs,
> and review bandwidth seems hard to come by.
>

I would personally very like to see these contributions / improvements
happening.
In the past, the community has indeed stalled a bit in keeping up to pace
with all the contributions, but this is something that most of the
committers should have in mind and fix soon.
In the past I looked mostly at connector contributions, and would like to
get up to speed with that again shortly.

Cheers,
Gordon


On Tue, Apr 17, 2018 at 1:58 AM, Thomas Weise <[hidden email]> wrote:

> Hi Gordon,
>
> This is indeed a discussion necessary to have!
>
> The purpose of previous PRs wasn't to provide solutions to the original
> identified issues, but rather to enable solve those through customization.
> What those customizations would be was also communicated, along with the
> intent to contribute them subsequently as well, if they are deemed broadly
> enough applicable and we find a reasonable contribution path.
>
> So far we have implemented the following in custom code:
>
> * use ListShards for discovery (plus also considered to limit the discovery
> to a single subtask and share the results between subtasks, which is almost
> certainly not something I would propose to add to Flink due to additional
> deployment dependencies).
>
> * override emitRecords in the fetcher to provide source watermarking with
> idle shard handling. Related discussions for the Kafka consumer show that
> it isn't straightforward to arrive at a solution that will satisfy
> everyone. Still open to contribute those changes also, but had not seen a
> response to that. Nevertheless, it is key to allow users to implement what
> they need for their use case.
>
> * retry certain exceptions in getRecords based on our production learnings.
> Whether or not those are applicable to everyone and the Flink
> implementation should be changed to retry by default is actually a future
> discussion I'm intending to start. But in any case, we need to be able to
> make the changes that we need on our end.
>
> * ability to configure the AWS HTTP client when defaults turn out
> unsuitable for the use case. This is a very basic requirement and it is
> rather surprising that the Flink Kinesis consumer wasn't written to provide
> access to the settings that the AWS SDK provides.
>
> I hope above examples make clear that it is necessary to leave room for
> users to augment a base implementation. There is no such thing as a perfect
> connector and there will always be new discoveries by users that require
> improvements or changes. Use case specific considerations may require to
> augment the even best default behavior, what works for one user may not
> work for another.
>
> If I don't have the hooks that referenced PRs enable, then the alternative
> is to fork the code. That will further reduce the likelihood of changes
> making their way back to Flink.
>
> I think we agree in the ultimate goal of improving the default
> implementation of the connector. There are more fundamental issues with the
> Kinesis connector (and other connectors) that I believe require deeper
> design work and rewrite, which go beyond what we discuss here.
>
> Finally, I'm also curious how much appetite for contributions in the
> connector areas there is? I see that we have now accumulated 340 open PRs,
> and review bandwidth seems hard to come by.
>
> Thanks,
> Thomas
>
>
> On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <[hidden email]>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for your PRs!
> >
> > I understand and fully agree with both points that you pointed out.
> >
> > What I'm still a bit torn with is the current proposed solutions for
> these
> > issues (and other similar connector issues).
> >
> > This might actually call for a good opportunity to bring some thoughts up
> > about connector contributions. My arguments would be the following:
> >
> > The solutions actually break some fundamental designs of the connector
> > code.
> > For example, in recent PRs for the Kinesis connector we've been proposing
> > to relax access of the `KinesisProxy` constructor.
> > AFAIK, this fix was triggered by an inefficiency in the
> > `KinesisProxy#getShardsOfStream` method which influences shard discovery
> > performance.
> > First of all, such a change breaks the fact that the class is an internal
> > class (it is marked as @Internal). It was made private as it handles
> > critical paths such as record fetching and shard listing, and is not
> > intended to be modified at all.
> > Second of all, the fix in the end did not fix the inefficiency at all -
> > only for advanced users who perhaps have saw the corresponding JIRA and
> > would bother to do the same and override the inefficient implementations
> by
> > themselves.
> > If there is a fix that would have benefited all users of the connector in
> > general, I would definitely be more in favor of that.
> > This goes the same for https://github.com/apache/flink/pull/5803 - I'm
> not
> > sure that allowing overrides on the retry logic is ideal. For example, we
> > previously introduced in the Elasticsearch connector a RetryHandler
> > user-facing API to allow such customizations.
> >
> > On one hand, I do understand that solving these connector issues properly
> > would perhaps require a more thorough design-wise ground-work and could
> be
> > more time-consuming.
> > On the other hand, I also understand that we need to find a good balance
> to
> > allow production users of these connectors to be able to quickly iterate
> > what issues the current code has and unblock encountered problems.
> >
> > My main concern is that our current approach to fixing these issues, IMO,
> > actually do not encourage good fixes to be contributed back to the
> > connector code, and they would therefore remain problematic as they are.
> >
> > What do you think? I may also be missing thing in a bigger picture here,
> so
> > feedback would be highly appreciated.
> >
> > Cheers,
> > Gordon
> >
> > On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <[hidden email]> wrote:
> >
> > > PR to provide the hooks:  https://github.com/apache/flink/pull/5803
> > >
> > >
> > > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi,
> > > >
> > > > I’m working on implementing retry for getRecords in
> > FlinkKinesisConsumer.
> > > > We occasionally get transient socket read timeouts. Instead of
> bubbling
> > > up
> > > > the exception and forcing a topology reset to checkpoint, we want to
> > > retry
> > > > getRecords. We also want to work with a lower socket read timeout
> than
> > > the
> > > > 50s default.
> > > >
> > > > Looking at the current KinesisProxy implementation, I’m aiming to
> > remove
> > > > some baked in assumptions that get into the way of customizing this:
> > > >
> > > > 1) AWSUtil.createKinesisClient - statically wired to use default
> > > > ClientConfiguration. The user should be able to control all settings
> > that
> > > > the SDK exposes instead.
> > > >
> > > > 2) Retry in KinesisProxy.getRecords limited to
> AmazonServiceException.
> > > > Perhaps it is OK as default, but the user should be able to retry on
> > > other
> > > > exceptions if desired.
> > > >
> > > > For 1) a generic option could be to set properties on
> > ClientConfiguration
> > > > using reflection (the class isn’t serializable but follows the Java
> > Bean
> > > > conventions). Something like BeanUtils would make it straightforward
> to
> > > > process user supplied properties with a specific prefix. Is there any
> > > other
> > > > place in the Flink codebase where this style of configuration
> approach
> > is
> > > > used and a preferred alternative to BeanUtils?
> > > >
> > > > Thanks
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis getRecords read timeout and retry

Thomas Weise
-->

On Wed, Apr 18, 2018 at 3:47 AM, Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Thomas,
>
> I see. If exposing access for these internal classes is a must to enable
> further contributions, then I would agree to do so.
> I think in the future, we should also keep a closer eye on parts of the
> connector code which is highly subject to modifications on a
> per-environment basis and keep flexibility in mind as the base assumption
> (as you stated very well, there is no "perfect" implementation for a
> connector, even with best default implementations).
>
> Some comments on the specific issues that you mentioned:
>
> * use ListShards for discovery (plus also considered to limit the discovery
> > to a single subtask and share the results between subtasks, which is
> almost
> > certainly not something I would propose to add to Flink due to additional
> > deployment dependencies).
>
>
> I think this one is most likely a direct improvement to the connector
> already (minus the inter-subtask coordination).
> The shard discovery method does not use other information from the
> `desribeStreams` call, so the alternate API should be a direct replacement.
>


Yes, and Kailash is going to open a PR for that soon after a related kink
on the AWS side has been sorted out:

https://issues.apache.org/jira/browse/FLINK-8944



>
>   * ability to configure the AWS HTTP client when defaults turn out
> > unsuitable for the use case. This is a very basic requirement and it is
> > rather surprising that the Flink Kinesis consumer wasn't written to
> provide
> > access to the settings that the AWS SDK provides.
>
>
> I see you have opened a separate JIRA for this (FLINK-9188). And yes, IMO
> this is definitely something very desirable in the future.
>

The JIRA has a commit linked to it that shows how it will look like. I will
rebase that and open a PR.


> Finally, I'm also curious how much appetite for contributions in the
> > connector areas there is? I see that we have now accumulated 340 open
> PRs,
> > and review bandwidth seems hard to come by.
> >
>
> I would personally very like to see these contributions / improvements
> happening.
> In the past, the community has indeed stalled a bit in keeping up to pace
> with all the contributions, but this is something that most of the
> committers should have in mind and fix soon.
> In the past I looked mostly at connector contributions, and would like to
> get up to speed with that again shortly.
>

That's good to know and thanks for your help with the review!

Thomas