Hey all,
We have a Flink job reading from Kafka (specifically it uses FlinkKafkaConsumer011). There are instances when the job is processing a backlog and it ends up reading at a significantly high throughput and degrades the underlying Kafka cluster. If there was a way to rate limit the calls to Kafka (by controlling how often the *consumer*.poll() is called), it would be a useful feature for our use case. Has anyone has run into a similar issue? Are there are any efforts/thoughts on implementing a rate-limiting feature in the Flink Kafka connector? Thanks Lakshmi |
Have you looked at Kafka quotas(
https://kafka.apache.org/0110/documentation.html#design_quotas) to achieve rate limiting on the consumer side? In your flink app, you should be able to set the client.id and configure kafka to rate limit you. Regards, Nagarjun *Success is not final, failure is not fatal: it is the courage to continue that counts. * *- Winston Churchill - * On Fri, Jan 25, 2019 at 5:39 PM Lakshmi Gururaja Rao <[hidden email]> wrote: > Hey all, > > We have a Flink job reading from Kafka (specifically it uses > FlinkKafkaConsumer011). There are instances when the job is processing a > backlog and it ends up reading at a significantly high throughput and > degrades the underlying Kafka cluster. If there was a way to rate limit the > calls to Kafka (by controlling how often the *consumer*.poll() is called), > it would be a useful feature for our use case. > > Has anyone has run into a similar issue? Are there are any efforts/thoughts > on implementing a rate-limiting feature in the Flink Kafka connector? > > Thanks > Lakshmi > |
In reply to this post by Lakshmi Gururaja Rao
> We have a Flink job reading from Kafka (specifically it uses
> FlinkKafkaConsumer011). There are instances when the job is processing a > backlog and it ends up reading at a significantly high throughput and > degrades the underlying Kafka cluster. If there was a way to rate limit the > calls to Kafka (by controlling how often the *consumer*.poll() is called), > it would be a useful feature for our use case. > > Has anyone has run into a similar issue? Are there are any efforts/thoughts > on implementing a rate-limiting feature in the Flink Kafka connector? We has similar problem and ended up putting a Guava rate limiter inside the Kafka consumer to limit the consumption rate. Since we use POJO, this is easily done by putting the rate limiter inside the POJO deserializer, which runs in the Kafka source. This has the benefit of not slowing down checkpoints because the source doesn't have to do alignment. If you don't care about checkpoint alignment, you can also add a map function with a Guava rate limiter immediately after the Kafka source. When it throttles, back pressure should eventually cause the Kafka source to slowdown consumption. Ning |
Hi Lakshmi,
As Nagajun mentioned, you might want to configure quota on the Kafka broker side for your Flink connector client. Thanks, Jiangjie (Becket) Qin On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote: > > We have a Flink job reading from Kafka (specifically it uses > > FlinkKafkaConsumer011). There are instances when the job is processing a > > backlog and it ends up reading at a significantly high throughput and > > degrades the underlying Kafka cluster. If there was a way to rate limit > the > > calls to Kafka (by controlling how often the *consumer*.poll() is > called), > > it would be a useful feature for our use case. > > > > Has anyone has run into a similar issue? Are there are any > efforts/thoughts > > on implementing a rate-limiting feature in the Flink Kafka connector? > > We has similar problem and ended up putting a Guava rate limiter > inside the Kafka consumer to limit the consumption rate. Since we use > POJO, this is easily done by putting the rate limiter inside the POJO > deserializer, which runs in the Kafka source. > > This has the benefit of not slowing down checkpoints because the > source doesn't have to do alignment. If you don't care about > checkpoint alignment, you can also add a map function with a Guava > rate limiter immediately after the Kafka source. When it throttles, > back pressure should eventually cause the Kafka source to slowdown > consumption. > > Ning > |
It is preferred for the service to rate limit. The problem is that not all
Kafka setups have that control enabled / support for it. Even when rate limiting was enabled, it may still be *nice* for the client to gracefully handle it. There was discussion in the past that we should not bloat the Kafka consumer further and I agree with that. On the other hand it would be good if the consumer can be augmented a bit to provide hooks for customization (we had done that for the Kinesis consumer also). Thanks, Thomas On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <[hidden email]> wrote: > Hi Lakshmi, > > As Nagajun mentioned, you might want to configure quota on the Kafka broker > side for your Flink connector client. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote: > > > > We have a Flink job reading from Kafka (specifically it uses > > > FlinkKafkaConsumer011). There are instances when the job is processing > a > > > backlog and it ends up reading at a significantly high throughput and > > > degrades the underlying Kafka cluster. If there was a way to rate limit > > the > > > calls to Kafka (by controlling how often the *consumer*.poll() is > > called), > > > it would be a useful feature for our use case. > > > > > > Has anyone has run into a similar issue? Are there are any > > efforts/thoughts > > > on implementing a rate-limiting feature in the Flink Kafka connector? > > > > We has similar problem and ended up putting a Guava rate limiter > > inside the Kafka consumer to limit the consumption rate. Since we use > > POJO, this is easily done by putting the rate limiter inside the POJO > > deserializer, which runs in the Kafka source. > > > > This has the benefit of not slowing down checkpoints because the > > source doesn't have to do alignment. If you don't care about > > checkpoint alignment, you can also add a map function with a Guava > > rate limiter immediately after the Kafka source. When it throttles, > > back pressure should eventually cause the Kafka source to slowdown > > consumption. > > > > Ning > > > |
I had the same reaction initially as some of the others on this thread --
which is "Use Kafka quotas".. I agree that in general a service should protect itself with it's own rate limiting rather than building it into clients like the FlinkKafkaConsumer. However, there are a few reasons we need to do this in our company currently: - We can't use Kafka quotas right now because the Kafka vendor we're using doesn't support them - Flink jobs that also make calls to RPC services are frequently DDOS'd by Flink apps and we simply need to slow them down when processing a backlog to protect external services. You could argue those services should protect themselve, and I agree, but for various technical reasons that's not possible ATM. - If you are going to artificially rate limit a Flink job the best place to do it is definitely in the source -- otherwise you end up with issues with backpressure and checkpointing. So, that said I suspect other users have the same issue so I think it's a good general feature to add to the Kafka consumer. It already exists in the Kinesis consumer as well. In terms of code bloat -- well the code is dead simple. It's just adding a Guava RateLimiter to the poll() loop and it's opt-in. The code has already been implemented for this. @Lakshmi Gururaja Rao <[hidden email]> Can you put up a apache/flink PR for this since it's already finished internally? Anyway, I'm not opposed to making the KafkaConsumer a little more customizable via adding some hooks if that's what others prefer -- however, let's also make the rate limited KafkaConsumer available in the Flink project at large rather than keeping it internal at Lyft. I think it's generally useful. -Jamie On Tue, Jan 29, 2019 at 8:57 PM Thomas Weise <[hidden email]> wrote: > It is preferred for the service to rate limit. The problem is that not all > Kafka setups have that control enabled / support for it. > > Even when rate limiting was enabled, it may still be *nice* for the client > to gracefully handle it. > > There was discussion in the past that we should not bloat the Kafka > consumer further and I agree with that. > > On the other hand it would be good if the consumer can be augmented a bit > to provide hooks for customization (we had done that for the Kinesis > consumer also). > > Thanks, > Thomas > > > On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <[hidden email]> wrote: > > > Hi Lakshmi, > > > > As Nagajun mentioned, you might want to configure quota on the Kafka > broker > > side for your Flink connector client. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote: > > > > > > We have a Flink job reading from Kafka (specifically it uses > > > > FlinkKafkaConsumer011). There are instances when the job is > processing > > a > > > > backlog and it ends up reading at a significantly high throughput and > > > > degrades the underlying Kafka cluster. If there was a way to rate > limit > > > the > > > > calls to Kafka (by controlling how often the *consumer*.poll() is > > > called), > > > > it would be a useful feature for our use case. > > > > > > > > Has anyone has run into a similar issue? Are there are any > > > efforts/thoughts > > > > on implementing a rate-limiting feature in the Flink Kafka connector? > > > > > > We has similar problem and ended up putting a Guava rate limiter > > > inside the Kafka consumer to limit the consumption rate. Since we use > > > POJO, this is easily done by putting the rate limiter inside the POJO > > > deserializer, which runs in the Kafka source. > > > > > > This has the benefit of not slowing down checkpoints because the > > > source doesn't have to do alignment. If you don't care about > > > checkpoint alignment, you can also add a map function with a Guava > > > rate limiter immediately after the Kafka source. When it throttles, > > > back pressure should eventually cause the Kafka source to slowdown > > > consumption. > > > > > > Ning > > > > > > |
I think it would be reasonable to have a rate limiter option in the
consumer, given that others have also looked to solve this. I think for this and other optional features, it would be good to implement in a way that overrides are possible. Someone else may want to do the limiting differently, taking into account more/other factors. Both, adding the limiter and making the consumer code more adoptable could be split into separate work also. BTW is there a JIRA for this? Thomas |
Thanks for adding more context @Jamie Grier <[hidden email]> .
JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501. Thanks Lakshmi On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote: > I think it would be reasonable to have a rate limiter option in the > consumer, given that others have also looked to solve this. > > I think for this and other optional features, it would be good to implement > in a way that overrides are possible. Someone else may want to do the > limiting differently, taking into account more/other factors. > > Both, adding the limiter and making the consumer code more adoptable could > be split into separate work also. > > BTW is there a JIRA for this? > > Thomas > -- *Lakshmi Gururaja Rao* SWE 217.778.7218 <+12177787218> [image: Lyft] <http://www.lyft.com/> |
Hi Jamie,
Thanks for the explanation. That makes sense to me. I am wondering if there is a more general way to add a rate limiter to all the connecters rather than doing that for each individual one. For example, maybe we can have the rate limiting logic in the Collector / Output, thus all the connectors (even operators?) could be rate limited. Thanks, Jiangjie (Becket) Qin On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao <[hidden email]> wrote: > Thanks for adding more context @Jamie Grier <[hidden email]> . > > JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501. > > Thanks > Lakshmi > > On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote: > > > I think it would be reasonable to have a rate limiter option in the > > consumer, given that others have also looked to solve this. > > > > I think for this and other optional features, it would be good to > implement > > in a way that overrides are possible. Someone else may want to do the > > limiting differently, taking into account more/other factors. > > > > Both, adding the limiter and making the consumer code more adoptable > could > > be split into separate work also. > > > > BTW is there a JIRA for this? > > > > Thomas > > > > > -- > *Lakshmi Gururaja Rao* > SWE > 217.778.7218 <+12177787218> > [image: Lyft] <http://www.lyft.com/> > |
+1, and something I was planning to comment on in the Jira issue.
Also, if rate limiting could effectively stop the stream, then this could be used solve a common data enrichment issue. Logically you want to pause one stream (typically the time series data being processed) while another stream (typically the broadcast) is broadcasting an update to enrichment data. Currently you have to buffer the time series data in your enrichment function, but if the rate limiter was pluggable, it could detect when this enrichment update was happening. — Ken > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote: > > Hi Jamie, > > Thanks for the explanation. That makes sense to me. I am wondering if there > is a more general way to add a rate limiter to all the connecters rather > than doing that for each individual one. For example, maybe we can have the > rate limiting logic in the Collector / Output, thus all the connectors > (even operators?) could be rate limited. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao <[hidden email]> > wrote: > >> Thanks for adding more context @Jamie Grier <[hidden email]> . >> >> JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501. >> >> Thanks >> Lakshmi >> >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote: >> >>> I think it would be reasonable to have a rate limiter option in the >>> consumer, given that others have also looked to solve this. >>> >>> I think for this and other optional features, it would be good to >> implement >>> in a way that overrides are possible. Someone else may want to do the >>> limiting differently, taking into account more/other factors. >>> >>> Both, adding the limiter and making the consumer code more adoptable >> could >>> be split into separate work also. >>> >>> BTW is there a JIRA for this? >>> >>> Thomas >>> >> >> >> -- >> *Lakshmi Gururaja Rao* >> SWE >> 217.778.7218 <+12177787218> >> [image: Lyft] <http://www.lyft.com/> >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
I initially thought of an approach similar to the collector idea, by
overriding emitRecord in the fetcher. That makes counting the bytes difficult, because it's downstream of decoding. Another idea of solving this in a reusable way was to have a separate rate limiting operator chained downstream of the consumer, which would develop back pressure and slow down the consumer. However, that would interfere with checkpoint barrier alignment (AFAIK, currently checkpoint barrier will also be stuck in the backlog)? Thomas On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email]> wrote: > +1, and something I was planning to comment on in the Jira issue. > > Also, if rate limiting could effectively stop the stream, then this could > be used solve a common data enrichment issue. > > Logically you want to pause one stream (typically the time series data > being processed) while another stream (typically the broadcast) is > broadcasting an update to enrichment data. > > Currently you have to buffer the time series data in your enrichment > function, but if the rate limiter was pluggable, it could detect when this > enrichment update was happening. > > — Ken > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote: > > > > Hi Jamie, > > > > Thanks for the explanation. That makes sense to me. I am wondering if > there > > is a more general way to add a rate limiter to all the connecters rather > > than doing that for each individual one. For example, maybe we can have > the > > rate limiting logic in the Collector / Output, thus all the connectors > > (even operators?) could be rate limited. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > <[hidden email]> > > wrote: > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> . > >> > >> JIRA for this feature: > https://issues.apache.org/jira/browse/FLINK-11501. > >> > >> Thanks > >> Lakshmi > >> > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote: > >> > >>> I think it would be reasonable to have a rate limiter option in the > >>> consumer, given that others have also looked to solve this. > >>> > >>> I think for this and other optional features, it would be good to > >> implement > >>> in a way that overrides are possible. Someone else may want to do the > >>> limiting differently, taking into account more/other factors. > >>> > >>> Both, adding the limiter and making the consumer code more adoptable > >> could > >>> be split into separate work also. > >>> > >>> BTW is there a JIRA for this? > >>> > >>> Thomas > >>> > >> > >> > >> -- > >> *Lakshmi Gururaja Rao* > >> SWE > >> 217.778.7218 <+12177787218> > >> [image: Lyft] <http://www.lyft.com/> > >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
Hi Thomas,
Good point about counting bytes. It would be difficult to throttle the byte rate with the existing API. And it seems that for sinks we have to do that rate limiting in the sink implementation anyways. There are a few ways to do some abstraction, but maybe adding a RateLimiter is trivial enough so we don't need to worry about reusing the throttling logic. But in any case, let's make sure the throttling threshold configuration names are the same for all the Source and Sinks. So the config parsing logic should probably still be put together in place. That is probably some implementation details we can discuss when review the patch. I am not sure about adding another throttling operator. How would that operator get the serialized size if it is downstream of a source. And how would that work on the sink side? Thanks, Jiangjie (Becket) Qin On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote: > I initially thought of an approach similar to the collector idea, by > overriding emitRecord in the fetcher. That makes counting the bytes > difficult, because it's downstream of decoding. > > Another idea of solving this in a reusable way was to have a separate rate > limiting operator chained downstream of the consumer, which would develop > back pressure and slow down the consumer. However, that would interfere > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier will > also be stuck in the backlog)? > > Thomas > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email]> > wrote: > > > +1, and something I was planning to comment on in the Jira issue. > > > > Also, if rate limiting could effectively stop the stream, then this could > > be used solve a common data enrichment issue. > > > > Logically you want to pause one stream (typically the time series data > > being processed) while another stream (typically the broadcast) is > > broadcasting an update to enrichment data. > > > > Currently you have to buffer the time series data in your enrichment > > function, but if the rate limiter was pluggable, it could detect when > this > > enrichment update was happening. > > > > — Ken > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote: > > > > > > Hi Jamie, > > > > > > Thanks for the explanation. That makes sense to me. I am wondering if > > there > > > is a more general way to add a rate limiter to all the connecters > rather > > > than doing that for each individual one. For example, maybe we can have > > the > > > rate limiting logic in the Collector / Output, thus all the connectors > > > (even operators?) could be rate limited. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > <[hidden email]> > > > wrote: > > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> . > > >> > > >> JIRA for this feature: > > https://issues.apache.org/jira/browse/FLINK-11501. > > >> > > >> Thanks > > >> Lakshmi > > >> > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote: > > >> > > >>> I think it would be reasonable to have a rate limiter option in the > > >>> consumer, given that others have also looked to solve this. > > >>> > > >>> I think for this and other optional features, it would be good to > > >> implement > > >>> in a way that overrides are possible. Someone else may want to do the > > >>> limiting differently, taking into account more/other factors. > > >>> > > >>> Both, adding the limiter and making the consumer code more adoptable > > >> could > > >>> be split into separate work also. > > >>> > > >>> BTW is there a JIRA for this? > > >>> > > >>> Thomas > > >>> > > >> > > >> > > >> -- > > >> *Lakshmi Gururaja Rao* > > >> SWE > > >> 217.778.7218 <+12177787218> > > >> [image: Lyft] <http://www.lyft.com/> > > >> > > > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > Custom big data solutions & training > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > |
Hi Becket,
The throttling operator would suffer from the same issue of not being able to accurately count bytes. On the other hand, it can be used by composition w/o modifying existing operators. As for sinks, wouldn't an operator that adjusts the rate in front of the sink suffice? Thomas On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> wrote: > Hi Thomas, > > Good point about counting bytes. It would be difficult to throttle the byte > rate with the existing API. And it seems that for sinks we have to do that > rate limiting in the sink implementation anyways. There are a few ways to > do some abstraction, but maybe adding a RateLimiter is trivial enough so we > don't need to worry about reusing the throttling logic. > > But in any case, let's make sure the throttling threshold configuration > names are the same for all the Source and Sinks. So the config parsing > logic should probably still be put together in place. That is probably some > implementation details we can discuss when review the patch. > > I am not sure about adding another throttling operator. How would that > operator get the serialized size if it is downstream of a source. And how > would that work on the sink side? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > > > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote: > > > I initially thought of an approach similar to the collector idea, by > > overriding emitRecord in the fetcher. That makes counting the bytes > > difficult, because it's downstream of decoding. > > > > Another idea of solving this in a reusable way was to have a separate > rate > > limiting operator chained downstream of the consumer, which would develop > > back pressure and slow down the consumer. However, that would interfere > > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier > will > > also be stuck in the backlog)? > > > > Thomas > > > > > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email] > > > > wrote: > > > > > +1, and something I was planning to comment on in the Jira issue. > > > > > > Also, if rate limiting could effectively stop the stream, then this > could > > > be used solve a common data enrichment issue. > > > > > > Logically you want to pause one stream (typically the time series data > > > being processed) while another stream (typically the broadcast) is > > > broadcasting an update to enrichment data. > > > > > > Currently you have to buffer the time series data in your enrichment > > > function, but if the rate limiter was pluggable, it could detect when > > this > > > enrichment update was happening. > > > > > > — Ken > > > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> > wrote: > > > > > > > > Hi Jamie, > > > > > > > > Thanks for the explanation. That makes sense to me. I am wondering if > > > there > > > > is a more general way to add a rate limiter to all the connecters > > rather > > > > than doing that for each individual one. For example, maybe we can > have > > > the > > > > rate limiting logic in the Collector / Output, thus all the > connectors > > > > (even operators?) could be rate limited. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > > <[hidden email]> > > > > wrote: > > > > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> . > > > >> > > > >> JIRA for this feature: > > > https://issues.apache.org/jira/browse/FLINK-11501. > > > >> > > > >> Thanks > > > >> Lakshmi > > > >> > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> > wrote: > > > >> > > > >>> I think it would be reasonable to have a rate limiter option in the > > > >>> consumer, given that others have also looked to solve this. > > > >>> > > > >>> I think for this and other optional features, it would be good to > > > >> implement > > > >>> in a way that overrides are possible. Someone else may want to do > the > > > >>> limiting differently, taking into account more/other factors. > > > >>> > > > >>> Both, adding the limiter and making the consumer code more > adoptable > > > >> could > > > >>> be split into separate work also. > > > >>> > > > >>> BTW is there a JIRA for this? > > > >>> > > > >>> Thomas > > > >>> > > > >> > > > >> > > > >> -- > > > >> *Lakshmi Gururaja Rao* > > > >> SWE > > > >> 217.778.7218 <+12177787218> > > > >> [image: Lyft] <http://www.lyft.com/> > > > >> > > > > > > -------------------------- > > > Ken Krugler > > > +1 530-210-6378 > > > http://www.scaleunlimited.com > > > Custom big data solutions & training > > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > > > > |
Hi Thomas,
Yes, adding a rate limiting operator in front of the sink would work for record rate limiting. Another thing I am thinking is that for local throttling, it seems that throttling in sources and sinks has some subtle differences. For example, consider both source and sink as HDFS. For sources, rate limiter in each task could work independently without a problem, the total throughput will be throttled at PARALLELISM * PER_TASK_THRESHOLD. On the sink side it might be a little different. After some aggregations, the data might become skewed. In that case, some sink tasks with hot keys may hit the rate limit and create back pressure, while the other sink tasks are pretty idle. This could result in over-throttling. Thanks, Jiangjie (Becket) Qin On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote: > Hi Becket, > > The throttling operator would suffer from the same issue of not being able > to accurately count bytes. > > On the other hand, it can be used by composition w/o modifying existing > operators. > > As for sinks, wouldn't an operator that adjusts the rate in front of the > sink suffice? > > Thomas > > > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> wrote: > > > Hi Thomas, > > > > Good point about counting bytes. It would be difficult to throttle the > byte > > rate with the existing API. And it seems that for sinks we have to do > that > > rate limiting in the sink implementation anyways. There are a few ways to > > do some abstraction, but maybe adding a RateLimiter is trivial enough so > we > > don't need to worry about reusing the throttling logic. > > > > But in any case, let's make sure the throttling threshold configuration > > names are the same for all the Source and Sinks. So the config parsing > > logic should probably still be put together in place. That is probably > some > > implementation details we can discuss when review the patch. > > > > I am not sure about adding another throttling operator. How would that > > operator get the serialized size if it is downstream of a source. And how > > would that work on the sink side? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote: > > > > > I initially thought of an approach similar to the collector idea, by > > > overriding emitRecord in the fetcher. That makes counting the bytes > > > difficult, because it's downstream of decoding. > > > > > > Another idea of solving this in a reusable way was to have a separate > > rate > > > limiting operator chained downstream of the consumer, which would > develop > > > back pressure and slow down the consumer. However, that would interfere > > > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier > > will > > > also be stuck in the backlog)? > > > > > > Thomas > > > > > > > > > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < > [hidden email] > > > > > > wrote: > > > > > > > +1, and something I was planning to comment on in the Jira issue. > > > > > > > > Also, if rate limiting could effectively stop the stream, then this > > could > > > > be used solve a common data enrichment issue. > > > > > > > > Logically you want to pause one stream (typically the time series > data > > > > being processed) while another stream (typically the broadcast) is > > > > broadcasting an update to enrichment data. > > > > > > > > Currently you have to buffer the time series data in your enrichment > > > > function, but if the rate limiter was pluggable, it could detect when > > > this > > > > enrichment update was happening. > > > > > > > > — Ken > > > > > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> > > wrote: > > > > > > > > > > Hi Jamie, > > > > > > > > > > Thanks for the explanation. That makes sense to me. I am wondering > if > > > > there > > > > > is a more general way to add a rate limiter to all the connecters > > > rather > > > > > than doing that for each individual one. For example, maybe we can > > have > > > > the > > > > > rate limiting logic in the Collector / Output, thus all the > > connectors > > > > > (even operators?) could be rate limited. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > > > <[hidden email]> > > > > > wrote: > > > > > > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> . > > > > >> > > > > >> JIRA for this feature: > > > > https://issues.apache.org/jira/browse/FLINK-11501. > > > > >> > > > > >> Thanks > > > > >> Lakshmi > > > > >> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> > > wrote: > > > > >> > > > > >>> I think it would be reasonable to have a rate limiter option in > the > > > > >>> consumer, given that others have also looked to solve this. > > > > >>> > > > > >>> I think for this and other optional features, it would be good to > > > > >> implement > > > > >>> in a way that overrides are possible. Someone else may want to do > > the > > > > >>> limiting differently, taking into account more/other factors. > > > > >>> > > > > >>> Both, adding the limiter and making the consumer code more > > adoptable > > > > >> could > > > > >>> be split into separate work also. > > > > >>> > > > > >>> BTW is there a JIRA for this? > > > > >>> > > > > >>> Thomas > > > > >>> > > > > >> > > > > >> > > > > >> -- > > > > >> *Lakshmi Gururaja Rao* > > > > >> SWE > > > > >> 217.778.7218 <+12177787218> > > > > >> [image: Lyft] <http://www.lyft.com/> > > > > >> > > > > > > > > -------------------------- > > > > Ken Krugler > > > > +1 530-210-6378 > > > > http://www.scaleunlimited.com > > > > Custom big data solutions & training > > > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > > > > > > > > > |
Apologies for the delay in responding here.
The idea of making the Ratelimiter config/creation logic generic across connectors makes sense to me. In the approach that we used and tested internally, we essentially created a Guava RateLimiter <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html> within the *KafkaConsumerThread* with a desired rate and moved the *consumer.poll() *into a separate method that uses the bytes received from every call to *poll() *to control the rate (i.e. as a parameter to the *acquire()* call). We have abstracted out the RateLimiting configuration and creation into a RateLimiterFactory to make it re-usable for other connectors. I also added some of the results we got from testing this approach on the FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll share a PR with the approach shortly and hopefully we can use that as a starting point to discuss this feature further. Thanks Lakshmi On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <[hidden email]> wrote: > Hi Thomas, > > Yes, adding a rate limiting operator in front of the sink would work for > record rate limiting. > > Another thing I am thinking is that for local throttling, it seems that > throttling in sources and sinks has some subtle differences. For example, > consider both source and sink as HDFS. For sources, rate limiter in each > task could work independently without a problem, the total throughput will > be throttled at PARALLELISM * PER_TASK_THRESHOLD. > > On the sink side it might be a little different. After some aggregations, > the data might become skewed. In that case, some sink tasks with hot keys > may hit the rate limit and create back pressure, while the other sink tasks > are pretty idle. This could result in over-throttling. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote: > > > Hi Becket, > > > > The throttling operator would suffer from the same issue of not being > able > > to accurately count bytes. > > > > On the other hand, it can be used by composition w/o modifying existing > > operators. > > > > As for sinks, wouldn't an operator that adjusts the rate in front of the > > sink suffice? > > > > Thomas > > > > > > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> > wrote: > > > > > Hi Thomas, > > > > > > Good point about counting bytes. It would be difficult to throttle the > > byte > > > rate with the existing API. And it seems that for sinks we have to do > > that > > > rate limiting in the sink implementation anyways. There are a few ways > to > > > do some abstraction, but maybe adding a RateLimiter is trivial enough > so > > we > > > don't need to worry about reusing the throttling logic. > > > > > > But in any case, let's make sure the throttling threshold configuration > > > names are the same for all the Source and Sinks. So the config parsing > > > logic should probably still be put together in place. That is probably > > some > > > implementation details we can discuss when review the patch. > > > > > > I am not sure about adding another throttling operator. How would that > > > operator get the serialized size if it is downstream of a source. And > how > > > would that work on the sink side? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote: > > > > > > > I initially thought of an approach similar to the collector idea, by > > > > overriding emitRecord in the fetcher. That makes counting the bytes > > > > difficult, because it's downstream of decoding. > > > > > > > > Another idea of solving this in a reusable way was to have a separate > > > rate > > > > limiting operator chained downstream of the consumer, which would > > develop > > > > back pressure and slow down the consumer. However, that would > interfere > > > > with checkpoint barrier alignment (AFAIK, currently checkpoint > barrier > > > will > > > > also be stuck in the backlog)? > > > > > > > > Thomas > > > > > > > > > > > > > > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < > > [hidden email] > > > > > > > > wrote: > > > > > > > > > +1, and something I was planning to comment on in the Jira issue. > > > > > > > > > > Also, if rate limiting could effectively stop the stream, then this > > > could > > > > > be used solve a common data enrichment issue. > > > > > > > > > > Logically you want to pause one stream (typically the time series > > data > > > > > being processed) while another stream (typically the broadcast) is > > > > > broadcasting an update to enrichment data. > > > > > > > > > > Currently you have to buffer the time series data in your > enrichment > > > > > function, but if the rate limiter was pluggable, it could detect > when > > > > this > > > > > enrichment update was happening. > > > > > > > > > > — Ken > > > > > > > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> > > > wrote: > > > > > > > > > > > > Hi Jamie, > > > > > > > > > > > > Thanks for the explanation. That makes sense to me. I am > wondering > > if > > > > > there > > > > > > is a more general way to add a rate limiter to all the connecters > > > > rather > > > > > > than doing that for each individual one. For example, maybe we > can > > > have > > > > > the > > > > > > rate limiting logic in the Collector / Output, thus all the > > > connectors > > > > > > (even operators?) could be rate limited. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao > > > > > <[hidden email]> > > > > > > wrote: > > > > > > > > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> . > > > > > >> > > > > > >> JIRA for this feature: > > > > > https://issues.apache.org/jira/browse/FLINK-11501. > > > > > >> > > > > > >> Thanks > > > > > >> Lakshmi > > > > > >> > > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> > > > wrote: > > > > > >> > > > > > >>> I think it would be reasonable to have a rate limiter option in > > the > > > > > >>> consumer, given that others have also looked to solve this. > > > > > >>> > > > > > >>> I think for this and other optional features, it would be good > to > > > > > >> implement > > > > > >>> in a way that overrides are possible. Someone else may want to > do > > > the > > > > > >>> limiting differently, taking into account more/other factors. > > > > > >>> > > > > > >>> Both, adding the limiter and making the consumer code more > > > adoptable > > > > > >> could > > > > > >>> be split into separate work also. > > > > > >>> > > > > > >>> BTW is there a JIRA for this? > > > > > >>> > > > > > >>> Thomas > > > > > >>> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> *Lakshmi Gururaja Rao* > > > > > >> SWE > > > > > >> 217.778.7218 <+12177787218> > > > > > >> [image: Lyft] <http://www.lyft.com/> > > > > > >> > > > > > > > > > > -------------------------- > > > > > Ken Krugler > > > > > +1 530-210-6378 > > > > > http://www.scaleunlimited.com > > > > > Custom big data solutions & training > > > > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > > > > > > > > > > > > > > > > -- *Lakshmi Gururaja Rao* SWE 217.778.7218 <+12177787218> [image: Lyft] <http://www.lyft.com/> |
I created a PR with the implementation described above —
https://github.com/apache/flink/pull/7679. Please provide feedback :) Thanks Lakshmi On Thu, Feb 7, 2019 at 11:04 AM Lakshmi Gururaja Rao <[hidden email]> wrote: > Apologies for the delay in responding here. > > The idea of making the Ratelimiter config/creation logic generic across > connectors makes sense to me. > > In the approach that we used and tested internally, we essentially created > a Guava RateLimiter > <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html> > within the *KafkaConsumerThread* with a desired rate and moved the *consumer.poll() > *into a separate method that uses the bytes received from every call to *poll() > *to control the rate (i.e. as a parameter to the *acquire()* call). We > have abstracted out the RateLimiting configuration and creation into a > RateLimiterFactory to make it re-usable for other connectors. > > I also added some of the results we got from testing this approach on the > FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll > share a PR with the approach shortly and hopefully we can use that as a > starting point to discuss this feature further. > > Thanks > Lakshmi > > On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <[hidden email]> wrote: > >> Hi Thomas, >> >> Yes, adding a rate limiting operator in front of the sink would work for >> record rate limiting. >> >> Another thing I am thinking is that for local throttling, it seems that >> throttling in sources and sinks has some subtle differences. For example, >> consider both source and sink as HDFS. For sources, rate limiter in each >> task could work independently without a problem, the total throughput will >> be throttled at PARALLELISM * PER_TASK_THRESHOLD. >> >> On the sink side it might be a little different. After some aggregations, >> the data might become skewed. In that case, some sink tasks with hot keys >> may hit the rate limit and create back pressure, while the other sink >> tasks >> are pretty idle. This could result in over-throttling. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote: >> >> > Hi Becket, >> > >> > The throttling operator would suffer from the same issue of not being >> able >> > to accurately count bytes. >> > >> > On the other hand, it can be used by composition w/o modifying existing >> > operators. >> > >> > As for sinks, wouldn't an operator that adjusts the rate in front of the >> > sink suffice? >> > >> > Thomas >> > >> > >> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> >> wrote: >> > >> > > Hi Thomas, >> > > >> > > Good point about counting bytes. It would be difficult to throttle the >> > byte >> > > rate with the existing API. And it seems that for sinks we have to do >> > that >> > > rate limiting in the sink implementation anyways. There are a few >> ways to >> > > do some abstraction, but maybe adding a RateLimiter is trivial enough >> so >> > we >> > > don't need to worry about reusing the throttling logic. >> > > >> > > But in any case, let's make sure the throttling threshold >> configuration >> > > names are the same for all the Source and Sinks. So the config parsing >> > > logic should probably still be put together in place. That is probably >> > some >> > > implementation details we can discuss when review the patch. >> > > >> > > I am not sure about adding another throttling operator. How would that >> > > operator get the serialized size if it is downstream of a source. And >> how >> > > would that work on the sink side? >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote: >> > > >> > > > I initially thought of an approach similar to the collector idea, by >> > > > overriding emitRecord in the fetcher. That makes counting the bytes >> > > > difficult, because it's downstream of decoding. >> > > > >> > > > Another idea of solving this in a reusable way was to have a >> separate >> > > rate >> > > > limiting operator chained downstream of the consumer, which would >> > develop >> > > > back pressure and slow down the consumer. However, that would >> interfere >> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint >> barrier >> > > will >> > > > also be stuck in the backlog)? >> > > > >> > > > Thomas >> > > > >> > > > >> > > > >> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler < >> > [hidden email] >> > > > >> > > > wrote: >> > > > >> > > > > +1, and something I was planning to comment on in the Jira issue. >> > > > > >> > > > > Also, if rate limiting could effectively stop the stream, then >> this >> > > could >> > > > > be used solve a common data enrichment issue. >> > > > > >> > > > > Logically you want to pause one stream (typically the time series >> > data >> > > > > being processed) while another stream (typically the broadcast) is >> > > > > broadcasting an update to enrichment data. >> > > > > >> > > > > Currently you have to buffer the time series data in your >> enrichment >> > > > > function, but if the rate limiter was pluggable, it could detect >> when >> > > > this >> > > > > enrichment update was happening. >> > > > > >> > > > > — Ken >> > > > > >> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> >> > > wrote: >> > > > > > >> > > > > > Hi Jamie, >> > > > > > >> > > > > > Thanks for the explanation. That makes sense to me. I am >> wondering >> > if >> > > > > there >> > > > > > is a more general way to add a rate limiter to all the >> connecters >> > > > rather >> > > > > > than doing that for each individual one. For example, maybe we >> can >> > > have >> > > > > the >> > > > > > rate limiting logic in the Collector / Output, thus all the >> > > connectors >> > > > > > (even operators?) could be rate limited. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Jiangjie (Becket) Qin >> > > > > > >> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao >> > > > > <[hidden email]> >> > > > > > wrote: >> > > > > > >> > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> >> . >> > > > > >> >> > > > > >> JIRA for this feature: >> > > > > https://issues.apache.org/jira/browse/FLINK-11501. >> > > > > >> >> > > > > >> Thanks >> > > > > >> Lakshmi >> > > > > >> >> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> >> > > wrote: >> > > > > >> >> > > > > >>> I think it would be reasonable to have a rate limiter option >> in >> > the >> > > > > >>> consumer, given that others have also looked to solve this. >> > > > > >>> >> > > > > >>> I think for this and other optional features, it would be >> good to >> > > > > >> implement >> > > > > >>> in a way that overrides are possible. Someone else may want >> to do >> > > the >> > > > > >>> limiting differently, taking into account more/other factors. >> > > > > >>> >> > > > > >>> Both, adding the limiter and making the consumer code more >> > > adoptable >> > > > > >> could >> > > > > >>> be split into separate work also. >> > > > > >>> >> > > > > >>> BTW is there a JIRA for this? >> > > > > >>> >> > > > > >>> Thomas >> > > > > >>> >> > > > > >> >> > > > > >> >> > > > > >> -- >> > > > > >> *Lakshmi Gururaja Rao* >> > > > > >> SWE >> > > > > >> 217.778.7218 <+12177787218> >> > > > > >> [image: Lyft] <http://www.lyft.com/> >> > > > > >> >> > > > > >> > > > > -------------------------- >> > > > > Ken Krugler >> > > > > +1 530-210-6378 >> > > > > http://www.scaleunlimited.com >> > > > > Custom big data solutions & training >> > > > > Flink, Solr, Hadoop, Cascading & Cassandra >> > > > > >> > > > > >> > > > >> > > >> > >> > > > -- > *Lakshmi Gururaja Rao* > SWE > 217.778.7218 <+12177787218> > [image: Lyft] <http://www.lyft.com/> > -- *Lakshmi Gururaja Rao* SWE 217.778.7218 <+12177787218> [image: Lyft] <http://www.lyft.com/> |
Free forum by Nabble | Edit this page |