Ratelimiting in the Flink Kafka connector

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Ratelimiting in the Flink Kafka connector

Lakshmi Gururaja Rao
Hey all,

We have a Flink job reading from Kafka (specifically it uses
FlinkKafkaConsumer011). There are instances when the job is processing a
backlog and it ends up reading at a significantly high throughput and
degrades the underlying Kafka cluster. If there was a way to rate limit the
calls to Kafka (by controlling how often the *consumer*.poll() is called),
it would be a useful feature for our use case.

Has anyone has run into a similar issue? Are there are any efforts/thoughts
on implementing a rate-limiting feature in the Flink Kafka connector?

Thanks
Lakshmi
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Nagarjun Guraja
Have you looked at Kafka quotas(
https://kafka.apache.org/0110/documentation.html#design_quotas) to achieve
rate limiting on the consumer side? In your flink app, you should be able
to set the client.id and configure kafka to rate limit you.

Regards,
Nagarjun

*Success is not final, failure is not fatal: it is the courage to continue
that counts. *
*- Winston Churchill - *


On Fri, Jan 25, 2019 at 5:39 PM Lakshmi Gururaja Rao <[hidden email]>
wrote:

> Hey all,
>
> We have a Flink job reading from Kafka (specifically it uses
> FlinkKafkaConsumer011). There are instances when the job is processing a
> backlog and it ends up reading at a significantly high throughput and
> degrades the underlying Kafka cluster. If there was a way to rate limit the
> calls to Kafka (by controlling how often the *consumer*.poll() is called),
> it would be a useful feature for our use case.
>
> Has anyone has run into a similar issue? Are there are any efforts/thoughts
> on implementing a rate-limiting feature in the Flink Kafka connector?
>
> Thanks
> Lakshmi
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Ning Shi
In reply to this post by Lakshmi Gururaja Rao
> We have a Flink job reading from Kafka (specifically it uses
> FlinkKafkaConsumer011). There are instances when the job is processing a
> backlog and it ends up reading at a significantly high throughput and
> degrades the underlying Kafka cluster. If there was a way to rate limit the
> calls to Kafka (by controlling how often the *consumer*.poll() is called),
> it would be a useful feature for our use case.
>
> Has anyone has run into a similar issue? Are there are any efforts/thoughts
> on implementing a rate-limiting feature in the Flink Kafka connector?

We has similar problem and ended up putting a Guava rate limiter
inside the Kafka consumer to limit the consumption rate. Since we use
POJO, this is easily done by putting the rate limiter inside the POJO
deserializer, which runs in the Kafka source.

This has the benefit of not slowing down checkpoints because the
source doesn't have to do alignment. If you don't care about
checkpoint alignment, you can also add a map function with a Guava
rate limiter immediately after the Kafka source. When it throttles,
back pressure should eventually cause the Kafka source to slowdown
consumption.

Ning
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Becket Qin
Hi Lakshmi,

As Nagajun mentioned, you might want to configure quota on the Kafka broker
side for your Flink connector client.

Thanks,

Jiangjie (Becket) Qin

On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote:

> > We have a Flink job reading from Kafka (specifically it uses
> > FlinkKafkaConsumer011). There are instances when the job is processing a
> > backlog and it ends up reading at a significantly high throughput and
> > degrades the underlying Kafka cluster. If there was a way to rate limit
> the
> > calls to Kafka (by controlling how often the *consumer*.poll() is
> called),
> > it would be a useful feature for our use case.
> >
> > Has anyone has run into a similar issue? Are there are any
> efforts/thoughts
> > on implementing a rate-limiting feature in the Flink Kafka connector?
>
> We has similar problem and ended up putting a Guava rate limiter
> inside the Kafka consumer to limit the consumption rate. Since we use
> POJO, this is easily done by putting the rate limiter inside the POJO
> deserializer, which runs in the Kafka source.
>
> This has the benefit of not slowing down checkpoints because the
> source doesn't have to do alignment. If you don't care about
> checkpoint alignment, you can also add a map function with a Guava
> rate limiter immediately after the Kafka source. When it throttles,
> back pressure should eventually cause the Kafka source to slowdown
> consumption.
>
> Ning
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Thomas Weise
It is preferred for the service to rate limit. The problem is that not all
Kafka setups have that control enabled / support for it.

Even when rate limiting was enabled, it may still be *nice* for the client
to gracefully handle it.

There was discussion in the past that we should not bloat the Kafka
consumer further and I agree with that.

On the other hand it would be good if the consumer can be augmented a bit
to provide hooks for customization (we had done that for the Kinesis
consumer also).

Thanks,
Thomas


On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <[hidden email]> wrote:

> Hi Lakshmi,
>
> As Nagajun mentioned, you might want to configure quota on the Kafka broker
> side for your Flink connector client.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote:
>
> > > We have a Flink job reading from Kafka (specifically it uses
> > > FlinkKafkaConsumer011). There are instances when the job is processing
> a
> > > backlog and it ends up reading at a significantly high throughput and
> > > degrades the underlying Kafka cluster. If there was a way to rate limit
> > the
> > > calls to Kafka (by controlling how often the *consumer*.poll() is
> > called),
> > > it would be a useful feature for our use case.
> > >
> > > Has anyone has run into a similar issue? Are there are any
> > efforts/thoughts
> > > on implementing a rate-limiting feature in the Flink Kafka connector?
> >
> > We has similar problem and ended up putting a Guava rate limiter
> > inside the Kafka consumer to limit the consumption rate. Since we use
> > POJO, this is easily done by putting the rate limiter inside the POJO
> > deserializer, which runs in the Kafka source.
> >
> > This has the benefit of not slowing down checkpoints because the
> > source doesn't have to do alignment. If you don't care about
> > checkpoint alignment, you can also add a map function with a Guava
> > rate limiter immediately after the Kafka source. When it throttles,
> > back pressure should eventually cause the Kafka source to slowdown
> > consumption.
> >
> > Ning
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Jamie Grier-3
I had the same reaction initially as some of the others on this thread --
which is "Use Kafka quotas"..  I agree that in general a service should
protect itself with it's own rate limiting rather than building it into
clients like the FlinkKafkaConsumer.

However, there are a few reasons we need to do this in our company
currently:
  - We can't use Kafka quotas right now because the Kafka vendor we're
using doesn't support them
  - Flink jobs that also make calls to RPC services are frequently DDOS'd
by Flink apps and we simply need to slow them down when processing a
backlog to protect external services.  You could argue those services
should protect themselve, and I agree, but for various technical reasons
that's not possible ATM.
  - If you are going to artificially rate limit a Flink job the best place
to do it is definitely in the source -- otherwise you end up with issues
with backpressure and checkpointing.

So, that said I suspect other users have the same issue so I think it's a
good general feature to add to the Kafka consumer.  It already exists in
the Kinesis consumer as well.

In terms of code bloat -- well the code is dead simple.  It's just adding a
Guava RateLimiter to the poll() loop and it's opt-in.  The code has already
been implemented for this.

@Lakshmi Gururaja Rao <[hidden email]>  Can you put up a apache/flink PR for
this since it's already finished internally?

Anyway, I'm not opposed to making the KafkaConsumer a little more
customizable via adding some hooks if that's what others prefer -- however,
let's also make the rate limited KafkaConsumer available in the Flink
project at large rather than keeping it internal at Lyft.  I think it's
generally useful.

-Jamie


On Tue, Jan 29, 2019 at 8:57 PM Thomas Weise <[hidden email]> wrote:

> It is preferred for the service to rate limit. The problem is that not all
> Kafka setups have that control enabled / support for it.
>
> Even when rate limiting was enabled, it may still be *nice* for the client
> to gracefully handle it.
>
> There was discussion in the past that we should not bloat the Kafka
> consumer further and I agree with that.
>
> On the other hand it would be good if the consumer can be augmented a bit
> to provide hooks for customization (we had done that for the Kinesis
> consumer also).
>
> Thanks,
> Thomas
>
>
> On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <[hidden email]> wrote:
>
> > Hi Lakshmi,
> >
> > As Nagajun mentioned, you might want to configure quota on the Kafka
> broker
> > side for your Flink connector client.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <[hidden email]> wrote:
> >
> > > > We have a Flink job reading from Kafka (specifically it uses
> > > > FlinkKafkaConsumer011). There are instances when the job is
> processing
> > a
> > > > backlog and it ends up reading at a significantly high throughput and
> > > > degrades the underlying Kafka cluster. If there was a way to rate
> limit
> > > the
> > > > calls to Kafka (by controlling how often the *consumer*.poll() is
> > > called),
> > > > it would be a useful feature for our use case.
> > > >
> > > > Has anyone has run into a similar issue? Are there are any
> > > efforts/thoughts
> > > > on implementing a rate-limiting feature in the Flink Kafka connector?
> > >
> > > We has similar problem and ended up putting a Guava rate limiter
> > > inside the Kafka consumer to limit the consumption rate. Since we use
> > > POJO, this is easily done by putting the rate limiter inside the POJO
> > > deserializer, which runs in the Kafka source.
> > >
> > > This has the benefit of not slowing down checkpoints because the
> > > source doesn't have to do alignment. If you don't care about
> > > checkpoint alignment, you can also add a map function with a Guava
> > > rate limiter immediately after the Kafka source. When it throttles,
> > > back pressure should eventually cause the Kafka source to slowdown
> > > consumption.
> > >
> > > Ning
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Thomas Weise
I think it would be reasonable to have a rate limiter option in the
consumer, given that others have also looked to solve this.

I think for this and other optional features, it would be good to implement
in a way that overrides are possible. Someone else may want to do the
limiting differently, taking into account more/other factors.

Both, adding the limiter and making the consumer code more adoptable could
be split into separate work also.

BTW is there a JIRA for this?

Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Lakshmi Gururaja Rao
Thanks for adding more context @Jamie Grier <[hidden email]> .

JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501.

Thanks
Lakshmi

On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote:

> I think it would be reasonable to have a rate limiter option in the
> consumer, given that others have also looked to solve this.
>
> I think for this and other optional features, it would be good to implement
> in a way that overrides are possible. Someone else may want to do the
> limiting differently, taking into account more/other factors.
>
> Both, adding the limiter and making the consumer code more adoptable could
> be split into separate work also.
>
> BTW is there a JIRA for this?
>
> Thomas
>


--
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Becket Qin
Hi Jamie,

Thanks for the explanation. That makes sense to me. I am wondering if there
is a more general way to add a rate limiter to all the connecters rather
than doing that for each individual one. For example, maybe we can have the
rate limiting logic in the Collector / Output, thus all the connectors
(even operators?) could be rate limited.

Thanks,

Jiangjie (Becket) Qin

On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao <[hidden email]>
wrote:

> Thanks for adding more context @Jamie Grier <[hidden email]> .
>
> JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501.
>
> Thanks
> Lakshmi
>
> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote:
>
> > I think it would be reasonable to have a rate limiter option in the
> > consumer, given that others have also looked to solve this.
> >
> > I think for this and other optional features, it would be good to
> implement
> > in a way that overrides are possible. Someone else may want to do the
> > limiting differently, taking into account more/other factors.
> >
> > Both, adding the limiter and making the consumer code more adoptable
> could
> > be split into separate work also.
> >
> > BTW is there a JIRA for this?
> >
> > Thomas
> >
>
>
> --
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Ken Krugler
+1, and something I was planning to comment on in the Jira issue.

Also, if rate limiting could effectively stop the stream, then this could be used solve a common data enrichment issue.

Logically you want to pause one stream (typically the time series data being processed) while another stream (typically the broadcast) is broadcasting an update to enrichment data.

Currently you have to buffer the time series data in your enrichment function, but if the rate limiter was pluggable, it could detect when this enrichment update was happening.

 — Ken

> On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote:
>
> Hi Jamie,
>
> Thanks for the explanation. That makes sense to me. I am wondering if there
> is a more general way to add a rate limiter to all the connecters rather
> than doing that for each individual one. For example, maybe we can have the
> rate limiting logic in the Collector / Output, thus all the connectors
> (even operators?) could be rate limited.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao <[hidden email]>
> wrote:
>
>> Thanks for adding more context @Jamie Grier <[hidden email]> .
>>
>> JIRA for this feature: https://issues.apache.org/jira/browse/FLINK-11501.
>>
>> Thanks
>> Lakshmi
>>
>> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote:
>>
>>> I think it would be reasonable to have a rate limiter option in the
>>> consumer, given that others have also looked to solve this.
>>>
>>> I think for this and other optional features, it would be good to
>> implement
>>> in a way that overrides are possible. Someone else may want to do the
>>> limiting differently, taking into account more/other factors.
>>>
>>> Both, adding the limiter and making the consumer code more adoptable
>> could
>>> be split into separate work also.
>>>
>>> BTW is there a JIRA for this?
>>>
>>> Thomas
>>>
>>
>>
>> --
>> *Lakshmi Gururaja Rao*
>> SWE
>> 217.778.7218 <+12177787218>
>> [image: Lyft] <http://www.lyft.com/>
>>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Thomas Weise
I initially thought of an approach similar to the collector idea, by
overriding emitRecord in the fetcher. That makes counting the bytes
difficult, because it's downstream of decoding.

Another idea of solving this in a reusable way was to have a separate rate
limiting operator chained downstream of the consumer, which would develop
back pressure and slow down the consumer. However, that would interfere
with checkpoint barrier alignment (AFAIK, currently checkpoint barrier will
also be stuck in the backlog)?

Thomas



On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email]>
wrote:

> +1, and something I was planning to comment on in the Jira issue.
>
> Also, if rate limiting could effectively stop the stream, then this could
> be used solve a common data enrichment issue.
>
> Logically you want to pause one stream (typically the time series data
> being processed) while another stream (typically the broadcast) is
> broadcasting an update to enrichment data.
>
> Currently you have to buffer the time series data in your enrichment
> function, but if the rate limiter was pluggable, it could detect when this
> enrichment update was happening.
>
>  — Ken
>
> > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote:
> >
> > Hi Jamie,
> >
> > Thanks for the explanation. That makes sense to me. I am wondering if
> there
> > is a more general way to add a rate limiter to all the connecters rather
> > than doing that for each individual one. For example, maybe we can have
> the
> > rate limiting logic in the Collector / Output, thus all the connectors
> > (even operators?) could be rate limited.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> <[hidden email]>
> > wrote:
> >
> >> Thanks for adding more context @Jamie Grier <[hidden email]> .
> >>
> >> JIRA for this feature:
> https://issues.apache.org/jira/browse/FLINK-11501.
> >>
> >> Thanks
> >> Lakshmi
> >>
> >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote:
> >>
> >>> I think it would be reasonable to have a rate limiter option in the
> >>> consumer, given that others have also looked to solve this.
> >>>
> >>> I think for this and other optional features, it would be good to
> >> implement
> >>> in a way that overrides are possible. Someone else may want to do the
> >>> limiting differently, taking into account more/other factors.
> >>>
> >>> Both, adding the limiter and making the consumer code more adoptable
> >> could
> >>> be split into separate work also.
> >>>
> >>> BTW is there a JIRA for this?
> >>>
> >>> Thomas
> >>>
> >>
> >>
> >> --
> >> *Lakshmi Gururaja Rao*
> >> SWE
> >> 217.778.7218 <+12177787218>
> >> [image: Lyft] <http://www.lyft.com/>
> >>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Becket Qin
Hi Thomas,

Good point about counting bytes. It would be difficult to throttle the byte
rate with the existing API. And it seems that for sinks we have to do that
rate limiting in the sink implementation anyways. There are a few ways to
do some abstraction, but maybe adding a RateLimiter is trivial enough so we
don't need to worry about reusing the throttling logic.

But in any case, let's make sure the throttling threshold configuration
names are the same for all the Source and Sinks. So the config parsing
logic should probably still be put together in place. That is probably some
implementation details we can discuss when review the patch.

I am not sure about adding another throttling operator. How would that
operator get the serialized size if it is downstream of a source. And how
would that work on the sink side?

Thanks,

Jiangjie (Becket) Qin








On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote:

> I initially thought of an approach similar to the collector idea, by
> overriding emitRecord in the fetcher. That makes counting the bytes
> difficult, because it's downstream of decoding.
>
> Another idea of solving this in a reusable way was to have a separate rate
> limiting operator chained downstream of the consumer, which would develop
> back pressure and slow down the consumer. However, that would interfere
> with checkpoint barrier alignment (AFAIK, currently checkpoint barrier will
> also be stuck in the backlog)?
>
> Thomas
>
>
>
> On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email]>
> wrote:
>
> > +1, and something I was planning to comment on in the Jira issue.
> >
> > Also, if rate limiting could effectively stop the stream, then this could
> > be used solve a common data enrichment issue.
> >
> > Logically you want to pause one stream (typically the time series data
> > being processed) while another stream (typically the broadcast) is
> > broadcasting an update to enrichment data.
> >
> > Currently you have to buffer the time series data in your enrichment
> > function, but if the rate limiter was pluggable, it could detect when
> this
> > enrichment update was happening.
> >
> >  — Ken
> >
> > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]> wrote:
> > >
> > > Hi Jamie,
> > >
> > > Thanks for the explanation. That makes sense to me. I am wondering if
> > there
> > > is a more general way to add a rate limiter to all the connecters
> rather
> > > than doing that for each individual one. For example, maybe we can have
> > the
> > > rate limiting logic in the Collector / Output, thus all the connectors
> > > (even operators?) could be rate limited.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> > <[hidden email]>
> > > wrote:
> > >
> > >> Thanks for adding more context @Jamie Grier <[hidden email]> .
> > >>
> > >> JIRA for this feature:
> > https://issues.apache.org/jira/browse/FLINK-11501.
> > >>
> > >> Thanks
> > >> Lakshmi
> > >>
> > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]> wrote:
> > >>
> > >>> I think it would be reasonable to have a rate limiter option in the
> > >>> consumer, given that others have also looked to solve this.
> > >>>
> > >>> I think for this and other optional features, it would be good to
> > >> implement
> > >>> in a way that overrides are possible. Someone else may want to do the
> > >>> limiting differently, taking into account more/other factors.
> > >>>
> > >>> Both, adding the limiter and making the consumer code more adoptable
> > >> could
> > >>> be split into separate work also.
> > >>>
> > >>> BTW is there a JIRA for this?
> > >>>
> > >>> Thomas
> > >>>
> > >>
> > >>
> > >> --
> > >> *Lakshmi Gururaja Rao*
> > >> SWE
> > >> 217.778.7218 <+12177787218>
> > >> [image: Lyft] <http://www.lyft.com/>
> > >>
> >
> > --------------------------
> > Ken Krugler
> > +1 530-210-6378
> > http://www.scaleunlimited.com
> > Custom big data solutions & training
> > Flink, Solr, Hadoop, Cascading & Cassandra
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Thomas Weise
Hi Becket,

The throttling operator would suffer from the same issue of not being able
to accurately count bytes.

On the other hand, it can be used by composition w/o modifying existing
operators.

As for sinks, wouldn't an operator that adjusts the rate in front of the
sink suffice?

Thomas


On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> wrote:

> Hi Thomas,
>
> Good point about counting bytes. It would be difficult to throttle the byte
> rate with the existing API. And it seems that for sinks we have to do that
> rate limiting in the sink implementation anyways. There are a few ways to
> do some abstraction, but maybe adding a RateLimiter is trivial enough so we
> don't need to worry about reusing the throttling logic.
>
> But in any case, let's make sure the throttling threshold configuration
> names are the same for all the Source and Sinks. So the config parsing
> logic should probably still be put together in place. That is probably some
> implementation details we can discuss when review the patch.
>
> I am not sure about adding another throttling operator. How would that
> operator get the serialized size if it is downstream of a source. And how
> would that work on the sink side?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
>
>
>
> On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote:
>
> > I initially thought of an approach similar to the collector idea, by
> > overriding emitRecord in the fetcher. That makes counting the bytes
> > difficult, because it's downstream of decoding.
> >
> > Another idea of solving this in a reusable way was to have a separate
> rate
> > limiting operator chained downstream of the consumer, which would develop
> > back pressure and slow down the consumer. However, that would interfere
> > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier
> will
> > also be stuck in the backlog)?
> >
> > Thomas
> >
> >
> >
> > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <[hidden email]
> >
> > wrote:
> >
> > > +1, and something I was planning to comment on in the Jira issue.
> > >
> > > Also, if rate limiting could effectively stop the stream, then this
> could
> > > be used solve a common data enrichment issue.
> > >
> > > Logically you want to pause one stream (typically the time series data
> > > being processed) while another stream (typically the broadcast) is
> > > broadcasting an update to enrichment data.
> > >
> > > Currently you have to buffer the time series data in your enrichment
> > > function, but if the rate limiter was pluggable, it could detect when
> > this
> > > enrichment update was happening.
> > >
> > >  — Ken
> > >
> > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]>
> wrote:
> > > >
> > > > Hi Jamie,
> > > >
> > > > Thanks for the explanation. That makes sense to me. I am wondering if
> > > there
> > > > is a more general way to add a rate limiter to all the connecters
> > rather
> > > > than doing that for each individual one. For example, maybe we can
> have
> > > the
> > > > rate limiting logic in the Collector / Output, thus all the
> connectors
> > > > (even operators?) could be rate limited.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> > > <[hidden email]>
> > > > wrote:
> > > >
> > > >> Thanks for adding more context @Jamie Grier <[hidden email]> .
> > > >>
> > > >> JIRA for this feature:
> > > https://issues.apache.org/jira/browse/FLINK-11501.
> > > >>
> > > >> Thanks
> > > >> Lakshmi
> > > >>
> > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]>
> wrote:
> > > >>
> > > >>> I think it would be reasonable to have a rate limiter option in the
> > > >>> consumer, given that others have also looked to solve this.
> > > >>>
> > > >>> I think for this and other optional features, it would be good to
> > > >> implement
> > > >>> in a way that overrides are possible. Someone else may want to do
> the
> > > >>> limiting differently, taking into account more/other factors.
> > > >>>
> > > >>> Both, adding the limiter and making the consumer code more
> adoptable
> > > >> could
> > > >>> be split into separate work also.
> > > >>>
> > > >>> BTW is there a JIRA for this?
> > > >>>
> > > >>> Thomas
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> *Lakshmi Gururaja Rao*
> > > >> SWE
> > > >> 217.778.7218 <+12177787218>
> > > >> [image: Lyft] <http://www.lyft.com/>
> > > >>
> > >
> > > --------------------------
> > > Ken Krugler
> > > +1 530-210-6378
> > > http://www.scaleunlimited.com
> > > Custom big data solutions & training
> > > Flink, Solr, Hadoop, Cascading & Cassandra
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Becket Qin
Hi Thomas,

Yes, adding a rate limiting operator in front of the sink would work for
record rate limiting.

Another thing I am thinking is that for local throttling, it seems that
throttling in sources and sinks has some subtle differences. For example,
consider both source and sink as HDFS. For sources, rate limiter in each
task could work independently without a problem, the total throughput will
be throttled at PARALLELISM * PER_TASK_THRESHOLD.

On the sink side it might be a little different. After some aggregations,
the data might become skewed. In that case, some sink tasks with hot keys
may hit the rate limit and create back pressure, while the other sink tasks
are pretty idle. This could result in over-throttling.

Thanks,

Jiangjie (Becket) Qin

On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote:

> Hi Becket,
>
> The throttling operator would suffer from the same issue of not being able
> to accurately count bytes.
>
> On the other hand, it can be used by composition w/o modifying existing
> operators.
>
> As for sinks, wouldn't an operator that adjusts the rate in front of the
> sink suffice?
>
> Thomas
>
>
> On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]> wrote:
>
> > Hi Thomas,
> >
> > Good point about counting bytes. It would be difficult to throttle the
> byte
> > rate with the existing API. And it seems that for sinks we have to do
> that
> > rate limiting in the sink implementation anyways. There are a few ways to
> > do some abstraction, but maybe adding a RateLimiter is trivial enough so
> we
> > don't need to worry about reusing the throttling logic.
> >
> > But in any case, let's make sure the throttling threshold configuration
> > names are the same for all the Source and Sinks. So the config parsing
> > logic should probably still be put together in place. That is probably
> some
> > implementation details we can discuss when review the patch.
> >
> > I am not sure about adding another throttling operator. How would that
> > operator get the serialized size if it is downstream of a source. And how
> > would that work on the sink side?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote:
> >
> > > I initially thought of an approach similar to the collector idea, by
> > > overriding emitRecord in the fetcher. That makes counting the bytes
> > > difficult, because it's downstream of decoding.
> > >
> > > Another idea of solving this in a reusable way was to have a separate
> > rate
> > > limiting operator chained downstream of the consumer, which would
> develop
> > > back pressure and slow down the consumer. However, that would interfere
> > > with checkpoint barrier alignment (AFAIK, currently checkpoint barrier
> > will
> > > also be stuck in the backlog)?
> > >
> > > Thomas
> > >
> > >
> > >
> > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <
> [hidden email]
> > >
> > > wrote:
> > >
> > > > +1, and something I was planning to comment on in the Jira issue.
> > > >
> > > > Also, if rate limiting could effectively stop the stream, then this
> > could
> > > > be used solve a common data enrichment issue.
> > > >
> > > > Logically you want to pause one stream (typically the time series
> data
> > > > being processed) while another stream (typically the broadcast) is
> > > > broadcasting an update to enrichment data.
> > > >
> > > > Currently you have to buffer the time series data in your enrichment
> > > > function, but if the rate limiter was pluggable, it could detect when
> > > this
> > > > enrichment update was happening.
> > > >
> > > >  — Ken
> > > >
> > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]>
> > wrote:
> > > > >
> > > > > Hi Jamie,
> > > > >
> > > > > Thanks for the explanation. That makes sense to me. I am wondering
> if
> > > > there
> > > > > is a more general way to add a rate limiter to all the connecters
> > > rather
> > > > > than doing that for each individual one. For example, maybe we can
> > have
> > > > the
> > > > > rate limiting logic in the Collector / Output, thus all the
> > connectors
> > > > > (even operators?) could be rate limited.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> > > > <[hidden email]>
> > > > > wrote:
> > > > >
> > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> .
> > > > >>
> > > > >> JIRA for this feature:
> > > > https://issues.apache.org/jira/browse/FLINK-11501.
> > > > >>
> > > > >> Thanks
> > > > >> Lakshmi
> > > > >>
> > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]>
> > wrote:
> > > > >>
> > > > >>> I think it would be reasonable to have a rate limiter option in
> the
> > > > >>> consumer, given that others have also looked to solve this.
> > > > >>>
> > > > >>> I think for this and other optional features, it would be good to
> > > > >> implement
> > > > >>> in a way that overrides are possible. Someone else may want to do
> > the
> > > > >>> limiting differently, taking into account more/other factors.
> > > > >>>
> > > > >>> Both, adding the limiter and making the consumer code more
> > adoptable
> > > > >> could
> > > > >>> be split into separate work also.
> > > > >>>
> > > > >>> BTW is there a JIRA for this?
> > > > >>>
> > > > >>> Thomas
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> *Lakshmi Gururaja Rao*
> > > > >> SWE
> > > > >> 217.778.7218 <+12177787218>
> > > > >> [image: Lyft] <http://www.lyft.com/>
> > > > >>
> > > >
> > > > --------------------------
> > > > Ken Krugler
> > > > +1 530-210-6378
> > > > http://www.scaleunlimited.com
> > > > Custom big data solutions & training
> > > > Flink, Solr, Hadoop, Cascading & Cassandra
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Lakshmi Gururaja Rao
Apologies for the delay in responding here.

The idea of making the Ratelimiter config/creation logic generic across
connectors makes sense to me.

In the approach that we used and tested internally, we essentially created
a Guava RateLimiter
<https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html>
within the *KafkaConsumerThread* with a desired rate and moved the
*consumer.poll()
*into a separate method that uses the bytes received from every call to *poll()
*to control the rate (i.e. as a parameter to the *acquire()* call). We have
abstracted out the RateLimiting configuration and creation into a
RateLimiterFactory to make it re-usable for other connectors.

I also added some of the results we got from testing this approach on the
FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll share
a PR with the approach shortly and hopefully we can use that as a starting
point to discuss this feature further.

Thanks
Lakshmi

On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <[hidden email]> wrote:

> Hi Thomas,
>
> Yes, adding a rate limiting operator in front of the sink would work for
> record rate limiting.
>
> Another thing I am thinking is that for local throttling, it seems that
> throttling in sources and sinks has some subtle differences. For example,
> consider both source and sink as HDFS. For sources, rate limiter in each
> task could work independently without a problem, the total throughput will
> be throttled at PARALLELISM * PER_TASK_THRESHOLD.
>
> On the sink side it might be a little different. After some aggregations,
> the data might become skewed. In that case, some sink tasks with hot keys
> may hit the rate limit and create back pressure, while the other sink tasks
> are pretty idle. This could result in over-throttling.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote:
>
> > Hi Becket,
> >
> > The throttling operator would suffer from the same issue of not being
> able
> > to accurately count bytes.
> >
> > On the other hand, it can be used by composition w/o modifying existing
> > operators.
> >
> > As for sinks, wouldn't an operator that adjusts the rate in front of the
> > sink suffice?
> >
> > Thomas
> >
> >
> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]>
> wrote:
> >
> > > Hi Thomas,
> > >
> > > Good point about counting bytes. It would be difficult to throttle the
> > byte
> > > rate with the existing API. And it seems that for sinks we have to do
> > that
> > > rate limiting in the sink implementation anyways. There are a few ways
> to
> > > do some abstraction, but maybe adding a RateLimiter is trivial enough
> so
> > we
> > > don't need to worry about reusing the throttling logic.
> > >
> > > But in any case, let's make sure the throttling threshold configuration
> > > names are the same for all the Source and Sinks. So the config parsing
> > > logic should probably still be put together in place. That is probably
> > some
> > > implementation details we can discuss when review the patch.
> > >
> > > I am not sure about adding another throttling operator. How would that
> > > operator get the serialized size if it is downstream of a source. And
> how
> > > would that work on the sink side?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote:
> > >
> > > > I initially thought of an approach similar to the collector idea, by
> > > > overriding emitRecord in the fetcher. That makes counting the bytes
> > > > difficult, because it's downstream of decoding.
> > > >
> > > > Another idea of solving this in a reusable way was to have a separate
> > > rate
> > > > limiting operator chained downstream of the consumer, which would
> > develop
> > > > back pressure and slow down the consumer. However, that would
> interfere
> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint
> barrier
> > > will
> > > > also be stuck in the backlog)?
> > > >
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <
> > [hidden email]
> > > >
> > > > wrote:
> > > >
> > > > > +1, and something I was planning to comment on in the Jira issue.
> > > > >
> > > > > Also, if rate limiting could effectively stop the stream, then this
> > > could
> > > > > be used solve a common data enrichment issue.
> > > > >
> > > > > Logically you want to pause one stream (typically the time series
> > data
> > > > > being processed) while another stream (typically the broadcast) is
> > > > > broadcasting an update to enrichment data.
> > > > >
> > > > > Currently you have to buffer the time series data in your
> enrichment
> > > > > function, but if the rate limiter was pluggable, it could detect
> when
> > > > this
> > > > > enrichment update was happening.
> > > > >
> > > > >  — Ken
> > > > >
> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]>
> > > wrote:
> > > > > >
> > > > > > Hi Jamie,
> > > > > >
> > > > > > Thanks for the explanation. That makes sense to me. I am
> wondering
> > if
> > > > > there
> > > > > > is a more general way to add a rate limiter to all the connecters
> > > > rather
> > > > > > than doing that for each individual one. For example, maybe we
> can
> > > have
> > > > > the
> > > > > > rate limiting logic in the Collector / Output, thus all the
> > > connectors
> > > > > > (even operators?) could be rate limited.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
> > > > > <[hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]> .
> > > > > >>
> > > > > >> JIRA for this feature:
> > > > > https://issues.apache.org/jira/browse/FLINK-11501.
> > > > > >>
> > > > > >> Thanks
> > > > > >> Lakshmi
> > > > > >>
> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]>
> > > wrote:
> > > > > >>
> > > > > >>> I think it would be reasonable to have a rate limiter option in
> > the
> > > > > >>> consumer, given that others have also looked to solve this.
> > > > > >>>
> > > > > >>> I think for this and other optional features, it would be good
> to
> > > > > >> implement
> > > > > >>> in a way that overrides are possible. Someone else may want to
> do
> > > the
> > > > > >>> limiting differently, taking into account more/other factors.
> > > > > >>>
> > > > > >>> Both, adding the limiter and making the consumer code more
> > > adoptable
> > > > > >> could
> > > > > >>> be split into separate work also.
> > > > > >>>
> > > > > >>> BTW is there a JIRA for this?
> > > > > >>>
> > > > > >>> Thomas
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> *Lakshmi Gururaja Rao*
> > > > > >> SWE
> > > > > >> 217.778.7218 <+12177787218>
> > > > > >> [image: Lyft] <http://www.lyft.com/>
> > > > > >>
> > > > >
> > > > > --------------------------
> > > > > Ken Krugler
> > > > > +1 530-210-6378
> > > > > http://www.scaleunlimited.com
> > > > > Custom big data solutions & training
> > > > > Flink, Solr, Hadoop, Cascading & Cassandra
> > > > >
> > > > >
> > > >
> > >
> >
>


--
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>
Reply | Threaded
Open this post in threaded view
|

Re: Ratelimiting in the Flink Kafka connector

Lakshmi Gururaja Rao
I created a PR with the implementation described above —
https://github.com/apache/flink/pull/7679. Please provide feedback :)

Thanks
Lakshmi

On Thu, Feb 7, 2019 at 11:04 AM Lakshmi Gururaja Rao <[hidden email]> wrote:

> Apologies for the delay in responding here.
>
> The idea of making the Ratelimiter config/creation logic generic across
> connectors makes sense to me.
>
> In the approach that we used and tested internally, we essentially created
> a Guava RateLimiter
> <https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html>
> within the *KafkaConsumerThread* with a desired rate and moved the *consumer.poll()
> *into a separate method that uses the bytes received from every call to *poll()
> *to control the rate (i.e. as a parameter to the *acquire()* call). We
> have abstracted out the RateLimiting configuration and creation into a
> RateLimiterFactory to make it re-usable for other connectors.
>
> I also added some of the results we got from testing this approach on the
> FLINK JIRA - https://issues.apache.org/jira/browse/FLINK-11501 . I'll
> share a PR with the approach shortly and hopefully we can use that as a
> starting point to discuss this feature further.
>
> Thanks
> Lakshmi
>
> On Fri, Feb 1, 2019 at 8:08 PM Becket Qin <[hidden email]> wrote:
>
>> Hi Thomas,
>>
>> Yes, adding a rate limiting operator in front of the sink would work for
>> record rate limiting.
>>
>> Another thing I am thinking is that for local throttling, it seems that
>> throttling in sources and sinks has some subtle differences. For example,
>> consider both source and sink as HDFS. For sources, rate limiter in each
>> task could work independently without a problem, the total throughput will
>> be throttled at PARALLELISM * PER_TASK_THRESHOLD.
>>
>> On the sink side it might be a little different. After some aggregations,
>> the data might become skewed. In that case, some sink tasks with hot keys
>> may hit the rate limit and create back pressure, while the other sink
>> tasks
>> are pretty idle. This could result in over-throttling.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Sat, Feb 2, 2019 at 12:20 AM Thomas Weise <[hidden email]> wrote:
>>
>> > Hi Becket,
>> >
>> > The throttling operator would suffer from the same issue of not being
>> able
>> > to accurately count bytes.
>> >
>> > On the other hand, it can be used by composition w/o modifying existing
>> > operators.
>> >
>> > As for sinks, wouldn't an operator that adjusts the rate in front of the
>> > sink suffice?
>> >
>> > Thomas
>> >
>> >
>> > On Thu, Jan 31, 2019 at 11:42 PM Becket Qin <[hidden email]>
>> wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > Good point about counting bytes. It would be difficult to throttle the
>> > byte
>> > > rate with the existing API. And it seems that for sinks we have to do
>> > that
>> > > rate limiting in the sink implementation anyways. There are a few
>> ways to
>> > > do some abstraction, but maybe adding a RateLimiter is trivial enough
>> so
>> > we
>> > > don't need to worry about reusing the throttling logic.
>> > >
>> > > But in any case, let's make sure the throttling threshold
>> configuration
>> > > names are the same for all the Source and Sinks. So the config parsing
>> > > logic should probably still be put together in place. That is probably
>> > some
>> > > implementation details we can discuss when review the patch.
>> > >
>> > > I am not sure about adding another throttling operator. How would that
>> > > operator get the serialized size if it is downstream of a source. And
>> how
>> > > would that work on the sink side?
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Fri, Feb 1, 2019 at 11:33 AM Thomas Weise <[hidden email]> wrote:
>> > >
>> > > > I initially thought of an approach similar to the collector idea, by
>> > > > overriding emitRecord in the fetcher. That makes counting the bytes
>> > > > difficult, because it's downstream of decoding.
>> > > >
>> > > > Another idea of solving this in a reusable way was to have a
>> separate
>> > > rate
>> > > > limiting operator chained downstream of the consumer, which would
>> > develop
>> > > > back pressure and slow down the consumer. However, that would
>> interfere
>> > > > with checkpoint barrier alignment (AFAIK, currently checkpoint
>> barrier
>> > > will
>> > > > also be stuck in the backlog)?
>> > > >
>> > > > Thomas
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Jan 31, 2019 at 7:13 PM Ken Krugler <
>> > [hidden email]
>> > > >
>> > > > wrote:
>> > > >
>> > > > > +1, and something I was planning to comment on in the Jira issue.
>> > > > >
>> > > > > Also, if rate limiting could effectively stop the stream, then
>> this
>> > > could
>> > > > > be used solve a common data enrichment issue.
>> > > > >
>> > > > > Logically you want to pause one stream (typically the time series
>> > data
>> > > > > being processed) while another stream (typically the broadcast) is
>> > > > > broadcasting an update to enrichment data.
>> > > > >
>> > > > > Currently you have to buffer the time series data in your
>> enrichment
>> > > > > function, but if the rate limiter was pluggable, it could detect
>> when
>> > > > this
>> > > > > enrichment update was happening.
>> > > > >
>> > > > >  — Ken
>> > > > >
>> > > > > > On Jan 31, 2019, at 6:10 PM, Becket Qin <[hidden email]>
>> > > wrote:
>> > > > > >
>> > > > > > Hi Jamie,
>> > > > > >
>> > > > > > Thanks for the explanation. That makes sense to me. I am
>> wondering
>> > if
>> > > > > there
>> > > > > > is a more general way to add a rate limiter to all the
>> connecters
>> > > > rather
>> > > > > > than doing that for each individual one. For example, maybe we
>> can
>> > > have
>> > > > > the
>> > > > > > rate limiting logic in the Collector / Output, thus all the
>> > > connectors
>> > > > > > (even operators?) could be rate limited.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jiangjie (Becket) Qin
>> > > > > >
>> > > > > > On Fri, Feb 1, 2019 at 7:23 AM Lakshmi Gururaja Rao
>> > > > > <[hidden email]>
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Thanks for adding more context @Jamie Grier <[hidden email]>
>> .
>> > > > > >>
>> > > > > >> JIRA for this feature:
>> > > > > https://issues.apache.org/jira/browse/FLINK-11501.
>> > > > > >>
>> > > > > >> Thanks
>> > > > > >> Lakshmi
>> > > > > >>
>> > > > > >> On Thu, Jan 31, 2019 at 3:20 PM Thomas Weise <[hidden email]>
>> > > wrote:
>> > > > > >>
>> > > > > >>> I think it would be reasonable to have a rate limiter option
>> in
>> > the
>> > > > > >>> consumer, given that others have also looked to solve this.
>> > > > > >>>
>> > > > > >>> I think for this and other optional features, it would be
>> good to
>> > > > > >> implement
>> > > > > >>> in a way that overrides are possible. Someone else may want
>> to do
>> > > the
>> > > > > >>> limiting differently, taking into account more/other factors.
>> > > > > >>>
>> > > > > >>> Both, adding the limiter and making the consumer code more
>> > > adoptable
>> > > > > >> could
>> > > > > >>> be split into separate work also.
>> > > > > >>>
>> > > > > >>> BTW is there a JIRA for this?
>> > > > > >>>
>> > > > > >>> Thomas
>> > > > > >>>
>> > > > > >>
>> > > > > >>
>> > > > > >> --
>> > > > > >> *Lakshmi Gururaja Rao*
>> > > > > >> SWE
>> > > > > >> 217.778.7218 <+12177787218>
>> > > > > >> [image: Lyft] <http://www.lyft.com/>
>> > > > > >>
>> > > > >
>> > > > > --------------------------
>> > > > > Ken Krugler
>> > > > > +1 530-210-6378
>> > > > > http://www.scaleunlimited.com
>> > > > > Custom big data solutions & training
>> > > > > Flink, Solr, Hadoop, Cascading & Cassandra
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
> --
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>
>


--
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>