Adding the streaming project to the main repository

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Adding the streaming project to the main repository

Gyula Fóra
Hey All,

Quick weekely update on the streaming project:

It was a good week we implemented a lot of new features and made
considerable work on the api too. Most notably:

- Cluster performance was measured against Storm on both simple streaming
wordcount and iterative algorithm (pagerank) and Flink Streaming was about
4 times faster. (partly because the output buffers)

- The API now support simple types instead of Tuple1s (so instead of
DataStream<Tuple1<String>> you can use DataStream<String>)

- The API was updated to match the new function interfaces as in the Main
project, with both the standard and the Rich functions

- The Directed emit api has been updated to include SplitDataStreams and a
.select(name) method to direct tuples to named outputs.

- We have added a .groupBy(..) operator to use with .reduce, .batchReduce
and .windowReduce allowing a streaming group-reduce on the whole stream,
sliding batches, and sliding time windows.

- We have also started refactoring our tests to run in much less time to
avoid travis errors :)

You can check all these changes on git of course:

https://github.com/mbalassi/incubator-flink/commits/streaming-ready


Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Kostas Tzoumas
Wow! Incredible :-) Can you share more details about the experiments you
ran (cluster setup, jobs, etc)?




On Fri, Aug 8, 2014 at 3:53 PM, Gyula Fóra <[hidden email]> wrote:

> Hey All,
>
> Quick weekely update on the streaming project:
>
> It was a good week we implemented a lot of new features and made
> considerable work on the api too. Most notably:
>
> - Cluster performance was measured against Storm on both simple streaming
> wordcount and iterative algorithm (pagerank) and Flink Streaming was about
> 4 times faster. (partly because the output buffers)
>
> - The API now support simple types instead of Tuple1s (so instead of
> DataStream<Tuple1<String>> you can use DataStream<String>)
>
> - The API was updated to match the new function interfaces as in the Main
> project, with both the standard and the Rich functions
>
> - The Directed emit api has been updated to include SplitDataStreams and a
> .select(name) method to direct tuples to named outputs.
>
> - We have added a .groupBy(..) operator to use with .reduce, .batchReduce
> and .windowReduce allowing a streaming group-reduce on the whole stream,
> sliding batches, and sliding time windows.
>
> - We have also started refactoring our tests to run in much less time to
> avoid travis errors :)
>
> You can check all these changes on git of course:
>
> https://github.com/mbalassi/incubator-flink/commits/streaming-ready
>
>
> Cheers,
> Gyula
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Ufuk Celebi

On 08 Aug 2014, at 16:07, Kostas Tzoumas <[hidden email]> wrote:

> Wow! Incredible :-) Can you share more details about the experiments you
> ran (cluster setup, jobs, etc)?

Same here. :-)

I would be especially interested about what you mean with "partly because of the output buffers".

Best wishes,

Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Ufuk Celebi
In reply to this post by Kostas Tzoumas

On 08 Aug 2014, at 16:07, Kostas Tzoumas <[hidden email]> wrote:

> Wow! Incredible :-) Can you share more details about the experiments you
> ran (cluster setup, jobs, etc)?

Same here. :-)

I would be especially interested about what you mean with "partly because of the output buffers".

Best wishes,

Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Gyula Fóra
Hey guys,

I might not be able to give you all the details right now, because some of
the data is on my colleague's computer, but I'm gonna try :)

We have a 30 machine cluster at SZTAKI with 2 cores each not a powerhouse
but good for experimenting.

We tested both Flink Streaming and Storm with a lot of different settings,
ranging from a couple machines to full cluster utilization. As for the
flink config, we used the default settings, and also the default settings
for Storm.

We ran two experiments, the first one was a simple streaming wordcount
example, implemented exactly the same way in both systems. We ran the
streaming jobs for about 15-30 minutes each for the experiments but we also
tested longer runs. The other test we ran was a streaming pagerank to test
the performance of the iterations (in storm you can just connect circles in
the topology, so we wanted to see how it compares to the out-of-topology
record passing). On both examples the Flink Streaming was on average about
3-4 times faster than the same implementation in storm. For storm fault
tolerance was turned of for the sake of the experiment, because that's
still an open issue for us.

You can check out the implementations here in this repo:

https://github.com/mbalassi/streaming-performance
(there have been a lot of api changes this week compared to what you find
in this repo)

One of our colleagues who have implemented these also working on a neat gui
for doing performance tests, its pretty cool :)

Going back to Ufuk's question regarding what I meant by the role of the
output buffers. So one main difference between storm and flink streaming
could be the way that output buffers are handled. In storm you set the
"output buffer" size for the number of records, which is by default
relatively small for better latency. In flink the output buffer is
typically much larger which gives you higher throughput (that's what we
were measuring). To be able to provide some latency constraint we
implemented a RecordWriter that flushes the output every predefined number
of milliseconds (or when the buffer is full). So this seems to be a better
way of going about latency guarantee, than setting a preset buffer size
with the number of records.

I think this is similar case as with the comparison of Storm and Spark. Of
course Spark has much higher throughput, but the question is how can we
fine tune the trade-off between latency and throughput, but we will need
more tests to answer these questions :)

But I think there are more performance improvements to come, we are
planning on doing similar topology optimization as the batch api in the
future.


Cheers,

Gyula





On Fri, Aug 8, 2014 at 4:59 PM, Ufuk Celebi <[hidden email]> wrote:

>
> On 08 Aug 2014, at 16:07, Kostas Tzoumas <[hidden email]>
> wrote:
>
> > Wow! Incredible :-) Can you share more details about the experiments you
> > ran (cluster setup, jobs, etc)?
>
> Same here. :-)
>
> I would be especially interested about what you mean with "partly because
> of the output buffers".
>
> Best wishes,
>
> Ufuk
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Ufuk Celebi
Thanks for the detailed explanation! Very nice to hear. :)

If your flushing writer does not give you enough control for the trade off (in general you cannot know how large records will be, right?) we can have a chat about runtime changes for this. I would be happy to help with it.

In theory it should not be a problem, because we don't rely on fixed buffer sizes and always serialize the buffer size with the network data.

The only immediate problem that comes to my mind would be when the sent buffer is too large for the receiving buffers, but I think we could come up with a solution. ;)

> On 08 Aug 2014, at 21:15, Gyula Fóra <[hidden email]> wrote:
>
> Hey guys,
>
> I might not be able to give you all the details right now, because some of
> the data is on my colleague's computer, but I'm gonna try :)
>
> We have a 30 machine cluster at SZTAKI with 2 cores each not a powerhouse
> but good for experimenting.
>
> We tested both Flink Streaming and Storm with a lot of different settings,
> ranging from a couple machines to full cluster utilization. As for the
> flink config, we used the default settings, and also the default settings
> for Storm.
>
> We ran two experiments, the first one was a simple streaming wordcount
> example, implemented exactly the same way in both systems. We ran the
> streaming jobs for about 15-30 minutes each for the experiments but we also
> tested longer runs. The other test we ran was a streaming pagerank to test
> the performance of the iterations (in storm you can just connect circles in
> the topology, so we wanted to see how it compares to the out-of-topology
> record passing). On both examples the Flink Streaming was on average about
> 3-4 times faster than the same implementation in storm. For storm fault
> tolerance was turned of for the sake of the experiment, because that's
> still an open issue for us.
>
> You can check out the implementations here in this repo:
>
> https://github.com/mbalassi/streaming-performance
> (there have been a lot of api changes this week compared to what you find
> in this repo)
>
> One of our colleagues who have implemented these also working on a neat gui
> for doing performance tests, its pretty cool :)
>
> Going back to Ufuk's question regarding what I meant by the role of the
> output buffers. So one main difference between storm and flink streaming
> could be the way that output buffers are handled. In storm you set the
> "output buffer" size for the number of records, which is by default
> relatively small for better latency. In flink the output buffer is
> typically much larger which gives you higher throughput (that's what we
> were measuring). To be able to provide some latency constraint we
> implemented a RecordWriter that flushes the output every predefined number
> of milliseconds (or when the buffer is full). So this seems to be a better
> way of going about latency guarantee, than setting a preset buffer size
> with the number of records.
>
> I think this is similar case as with the comparison of Storm and Spark. Of
> course Spark has much higher throughput, but the question is how can we
> fine tune the trade-off between latency and throughput, but we will need
> more tests to answer these questions :)
>
> But I think there are more performance improvements to come, we are
> planning on doing similar topology optimization as the batch api in the
> future.
>
>
> Cheers,
>
> Gyula
>
>
>
>
>
>> On Fri, Aug 8, 2014 at 4:59 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>>
>> On 08 Aug 2014, at 16:07, Kostas Tzoumas <[hidden email]>
>> wrote:
>>
>>> Wow! Incredible :-) Can you share more details about the experiments you
>>> ran (cluster setup, jobs, etc)?
>>
>> Same here. :-)
>>
>> I would be especially interested about what you mean with "partly because
>> of the output buffers".
>>
>> Best wishes,
>>
>> Ufuk
>>
Reply | Threaded
Open this post in threaded view
|

Re: Adding the streaming project to the main repository

Gyula Fóra
Okay, lets have a chat about this sometimes, maybe we can come up with
something even better.

I already talked briefly about this with Stephan about the possibility of
runtime adjusting the output buffers but it seemed like the automatic
flushing was a far easier and possibly even better choice. (it shouldnt
even cause a significant overhead since we are just sleeping in another
thread)

It would also be cool if you could help introduce us deeper into this
network layer because its really a lot of code to look at without some
guide :D



On Fri, Aug 8, 2014 at 9:31 PM, Ufuk Celebi <[hidden email]> wrote:

> Thanks for the detailed explanation! Very nice to hear. :)
>
> If your flushing writer does not give you enough control for the trade off
> (in general you cannot know how large records will be, right?) we can have
> a chat about runtime changes for this. I would be happy to help with it.
>
> In theory it should not be a problem, because we don't rely on fixed
> buffer sizes and always serialize the buffer size with the network data.
>
> The only immediate problem that comes to my mind would be when the sent
> buffer is too large for the receiving buffers, but I think we could come up
> with a solution. ;)
>
> > On 08 Aug 2014, at 21:15, Gyula Fóra <[hidden email]> wrote:
> >
> > Hey guys,
> >
> > I might not be able to give you all the details right now, because some
> of
> > the data is on my colleague's computer, but I'm gonna try :)
> >
> > We have a 30 machine cluster at SZTAKI with 2 cores each not a powerhouse
> > but good for experimenting.
> >
> > We tested both Flink Streaming and Storm with a lot of different
> settings,
> > ranging from a couple machines to full cluster utilization. As for the
> > flink config, we used the default settings, and also the default settings
> > for Storm.
> >
> > We ran two experiments, the first one was a simple streaming wordcount
> > example, implemented exactly the same way in both systems. We ran the
> > streaming jobs for about 15-30 minutes each for the experiments but we
> also
> > tested longer runs. The other test we ran was a streaming pagerank to
> test
> > the performance of the iterations (in storm you can just connect circles
> in
> > the topology, so we wanted to see how it compares to the out-of-topology
> > record passing). On both examples the Flink Streaming was on average
> about
> > 3-4 times faster than the same implementation in storm. For storm fault
> > tolerance was turned of for the sake of the experiment, because that's
> > still an open issue for us.
> >
> > You can check out the implementations here in this repo:
> >
> > https://github.com/mbalassi/streaming-performance
> > (there have been a lot of api changes this week compared to what you find
> > in this repo)
> >
> > One of our colleagues who have implemented these also working on a neat
> gui
> > for doing performance tests, its pretty cool :)
> >
> > Going back to Ufuk's question regarding what I meant by the role of the
> > output buffers. So one main difference between storm and flink streaming
> > could be the way that output buffers are handled. In storm you set the
> > "output buffer" size for the number of records, which is by default
> > relatively small for better latency. In flink the output buffer is
> > typically much larger which gives you higher throughput (that's what we
> > were measuring). To be able to provide some latency constraint we
> > implemented a RecordWriter that flushes the output every predefined
> number
> > of milliseconds (or when the buffer is full). So this seems to be a
> better
> > way of going about latency guarantee, than setting a preset buffer size
> > with the number of records.
> >
> > I think this is similar case as with the comparison of Storm and Spark.
> Of
> > course Spark has much higher throughput, but the question is how can we
> > fine tune the trade-off between latency and throughput, but we will need
> > more tests to answer these questions :)
> >
> > But I think there are more performance improvements to come, we are
> > planning on doing similar topology optimization as the batch api in the
> > future.
> >
> >
> > Cheers,
> >
> > Gyula
> >
> >
> >
> >
> >
> >> On Fri, Aug 8, 2014 at 4:59 PM, Ufuk Celebi <[hidden email]>
> wrote:
> >>
> >>
> >> On 08 Aug 2014, at 16:07, Kostas Tzoumas <[hidden email]>
> >> wrote:
> >>
> >>> Wow! Incredible :-) Can you share more details about the experiments
> you
> >>> ran (cluster setup, jobs, etc)?
> >>
> >> Same here. :-)
> >>
> >> I would be especially interested about what you mean with "partly
> because
> >> of the output buffers".
> >>
> >> Best wishes,
> >>
> >> Ufuk
> >>
>