Periodic full stream aggregations

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Periodic full stream aggregations

Gyula Fóra-2
Hey all,

I think we are missing a quite useful feature that could be implemented
(with some slight modifications) on top of the current windowing api.

We currently provide 2 ways of aggregating (or reducing) over streams:
doing a continuous aggregation and always output the aggregated value
(which cannot be done properly in parallel) or doing aggregation in a
window periodically.

What we don't have at the moment is periodic aggregations on the whole
stream. I would even go as far as to remove the continuous outputting
reduce/aggregate it and replace it with this version as this in return can
be done properly in parallel.

My suggestion would be that a call:

dataStream.reduce(..)
dataStream.sum(..)

would return a windowed data stream where the window is the whole record
history, and the user would need to define a trigger to get the actual
reduced values like:

dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced
results.
dataStream.sum(...).every(...)

I think the current data stream reduce/aggregation is very confusing
without being practical for any normal use-case.

Also this would be a very api breaking change (but I would still make this
change as it is much more intuitive than the current behaviour) so I would
try to push it before the release if we can agree.

Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Gyula,

I have a question regarding your suggestion.

Can the current continuous aggregation be also specified with your
proposed periodic aggregation?

I am thinking about something like

dataStream.reduce(...).every(Count.of(1))

Cheers,
Bruno

On 20.04.2015 22:32, Gyula Fóra wrote:

> Hey all,
>
> I think we are missing a quite useful feature that could be
> implemented (with some slight modifications) on top of the current
> windowing api.
>
> We currently provide 2 ways of aggregating (or reducing) over
> streams: doing a continuous aggregation and always output the
> aggregated value (which cannot be done properly in parallel) or
> doing aggregation in a window periodically.
>
> What we don't have at the moment is periodic aggregations on the
> whole stream. I would even go as far as to remove the continuous
> outputting reduce/aggregate it and replace it with this version as
> this in return can be done properly in parallel.
>
> My suggestion would be that a call:
>
> dataStream.reduce(..) dataStream.sum(..)
>
> would return a windowed data stream where the window is the whole
> record history, and the user would need to define a trigger to get
> the actual reduced values like:
>
> dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> reduced results. dataStream.sum(...).every(...)
>
> I think the current data stream reduce/aggregation is very
> confusing without being practical for any normal use-case.
>
> Also this would be a very api breaking change (but I would still
> make this change as it is much more intuitive than the current
> behaviour) so I would try to push it before the release if we can
> agree.
>
> Cheers, Gyula
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
=8bVQ
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Gyula Fóra-2
Hi Bruno,

Of course you can do that as well. (That's the good part :p )

I will open a PR soon with the proposed changes (first without breaking the
current Api) and I will post it here.

Cheers,
Gyula

On Tuesday, April 21, 2015, Bruno Cadonna <[hidden email]>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Gyula,
>
> I have a question regarding your suggestion.
>
> Can the current continuous aggregation be also specified with your
> proposed periodic aggregation?
>
> I am thinking about something like
>
> dataStream.reduce(...).every(Count.of(1))
>
> Cheers,
> Bruno
>
> On 20.04.2015 22:32, Gyula Fóra wrote:
> > Hey all,
> >
> > I think we are missing a quite useful feature that could be
> > implemented (with some slight modifications) on top of the current
> > windowing api.
> >
> > We currently provide 2 ways of aggregating (or reducing) over
> > streams: doing a continuous aggregation and always output the
> > aggregated value (which cannot be done properly in parallel) or
> > doing aggregation in a window periodically.
> >
> > What we don't have at the moment is periodic aggregations on the
> > whole stream. I would even go as far as to remove the continuous
> > outputting reduce/aggregate it and replace it with this version as
> > this in return can be done properly in parallel.
> >
> > My suggestion would be that a call:
> >
> > dataStream.reduce(..) dataStream.sum(..)
> >
> > would return a windowed data stream where the window is the whole
> > record history, and the user would need to define a trigger to get
> > the actual reduced values like:
> >
> > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > reduced results. dataStream.sum(...).every(...)
> >
> > I think the current data stream reduce/aggregation is very
> > confusing without being practical for any normal use-case.
> >
> > Also this would be a very api breaking change (but I would still
> > make this change as it is much more intuitive than the current
> > behaviour) so I would try to push it before the release if we can
> > agree.
> >
> > Cheers, Gyula
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> =8bVQ
> -----END PGP SIGNATURE-----
>
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Fabian Hueske-2
Is it possible to switch the order of the statements, i.e.,

dataStream.every(Time.of(4,sec)).reduce(...) instead of
dataStream.reduce(...).every(Time.of(4,sec))

I think that would be more consistent with the structure of the remaining
API.

Cheers, Fabian

2015-04-21 10:57 GMT+02:00 Gyula Fóra <[hidden email]>:

> Hi Bruno,
>
> Of course you can do that as well. (That's the good part :p )
>
> I will open a PR soon with the proposed changes (first without breaking the
> current Api) and I will post it here.
>
> Cheers,
> Gyula
>
> On Tuesday, April 21, 2015, Bruno Cadonna <[hidden email]
> >
> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA1
> >
> > Hi Gyula,
> >
> > I have a question regarding your suggestion.
> >
> > Can the current continuous aggregation be also specified with your
> > proposed periodic aggregation?
> >
> > I am thinking about something like
> >
> > dataStream.reduce(...).every(Count.of(1))
> >
> > Cheers,
> > Bruno
> >
> > On 20.04.2015 22:32, Gyula Fóra wrote:
> > > Hey all,
> > >
> > > I think we are missing a quite useful feature that could be
> > > implemented (with some slight modifications) on top of the current
> > > windowing api.
> > >
> > > We currently provide 2 ways of aggregating (or reducing) over
> > > streams: doing a continuous aggregation and always output the
> > > aggregated value (which cannot be done properly in parallel) or
> > > doing aggregation in a window periodically.
> > >
> > > What we don't have at the moment is periodic aggregations on the
> > > whole stream. I would even go as far as to remove the continuous
> > > outputting reduce/aggregate it and replace it with this version as
> > > this in return can be done properly in parallel.
> > >
> > > My suggestion would be that a call:
> > >
> > > dataStream.reduce(..) dataStream.sum(..)
> > >
> > > would return a windowed data stream where the window is the whole
> > > record history, and the user would need to define a trigger to get
> > > the actual reduced values like:
> > >
> > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > > reduced results. dataStream.sum(...).every(...)
> > >
> > > I think the current data stream reduce/aggregation is very
> > > confusing without being practical for any normal use-case.
> > >
> > > Also this would be a very api breaking change (but I would still
> > > make this change as it is much more intuitive than the current
> > > behaviour) so I would try to push it before the release if we can
> > > agree.
> > >
> > > Cheers, Gyula
> > >
> >
> > - --
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> >   Dr. Bruno Cadonna
> >   Postdoctoral Researcher
> >
> >   Databases and Information Systems
> >   Department of Computer Science
> >   Humboldt-Universität zu Berlin
> >
> >   http://www.informatik.hu-berlin.de/~cadonnab
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > -----BEGIN PGP SIGNATURE-----
> > Version: GnuPG v1.4.11 (GNU/Linux)
> >
> > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> > =8bVQ
> > -----END PGP SIGNATURE-----
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Gyula Fóra
Thats a good idea, I will modify my PR to that :)

Gyula

On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske <[hidden email]> wrote:

> Is it possible to switch the order of the statements, i.e.,
>
> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> dataStream.reduce(...).every(Time.of(4,sec))
>
> I think that would be more consistent with the structure of the remaining
> API.
>
> Cheers, Fabian
>
> 2015-04-21 10:57 GMT+02:00 Gyula Fóra <[hidden email]>:
>
> > Hi Bruno,
> >
> > Of course you can do that as well. (That's the good part :p )
> >
> > I will open a PR soon with the proposed changes (first without breaking
> the
> > current Api) and I will post it here.
> >
> > Cheers,
> > Gyula
> >
> > On Tuesday, April 21, 2015, Bruno Cadonna <
> [hidden email]
> > >
> > wrote:
> >
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA1
> > >
> > > Hi Gyula,
> > >
> > > I have a question regarding your suggestion.
> > >
> > > Can the current continuous aggregation be also specified with your
> > > proposed periodic aggregation?
> > >
> > > I am thinking about something like
> > >
> > > dataStream.reduce(...).every(Count.of(1))
> > >
> > > Cheers,
> > > Bruno
> > >
> > > On 20.04.2015 22:32, Gyula Fóra wrote:
> > > > Hey all,
> > > >
> > > > I think we are missing a quite useful feature that could be
> > > > implemented (with some slight modifications) on top of the current
> > > > windowing api.
> > > >
> > > > We currently provide 2 ways of aggregating (or reducing) over
> > > > streams: doing a continuous aggregation and always output the
> > > > aggregated value (which cannot be done properly in parallel) or
> > > > doing aggregation in a window periodically.
> > > >
> > > > What we don't have at the moment is periodic aggregations on the
> > > > whole stream. I would even go as far as to remove the continuous
> > > > outputting reduce/aggregate it and replace it with this version as
> > > > this in return can be done properly in parallel.
> > > >
> > > > My suggestion would be that a call:
> > > >
> > > > dataStream.reduce(..) dataStream.sum(..)
> > > >
> > > > would return a windowed data stream where the window is the whole
> > > > record history, and the user would need to define a trigger to get
> > > > the actual reduced values like:
> > > >
> > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
> > > > reduced results. dataStream.sum(...).every(...)
> > > >
> > > > I think the current data stream reduce/aggregation is very
> > > > confusing without being practical for any normal use-case.
> > > >
> > > > Also this would be a very api breaking change (but I would still
> > > > make this change as it is much more intuitive than the current
> > > > behaviour) so I would try to push it before the release if we can
> > > > agree.
> > > >
> > > > Cheers, Gyula
> > > >
> > >
> > > - --
> > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > >
> > >   Dr. Bruno Cadonna
> > >   Postdoctoral Researcher
> > >
> > >   Databases and Information Systems
> > >   Department of Computer Science
> > >   Humboldt-Universität zu Berlin
> > >
> > >   http://www.informatik.hu-berlin.de/~cadonnab
> > >
> > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > > -----BEGIN PGP SIGNATURE-----
> > > Version: GnuPG v1.4.11 (GNU/Linux)
> > >
> > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
> > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
> > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
> > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
> > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
> > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
> > > =8bVQ
> > > -----END PGP SIGNATURE-----
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Gyula Fóra
I have opened a PR for this feature:

https://github.com/apache/flink/pull/614

Cheers,
Gyula

On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra <[hidden email]> wrote:

> Thats a good idea, I will modify my PR to that :)
>
> Gyula
>
> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske <[hidden email]> wrote:
>
>> Is it possible to switch the order of the statements, i.e.,
>>
>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
>> dataStream.reduce(...).every(Time.of(4,sec))
>>
>> I think that would be more consistent with the structure of the remaining
>> API.
>>
>> Cheers, Fabian
>>
>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra <[hidden email]>:
>>
>> > Hi Bruno,
>> >
>> > Of course you can do that as well. (That's the good part :p )
>> >
>> > I will open a PR soon with the proposed changes (first without breaking
>> the
>> > current Api) and I will post it here.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Tuesday, April 21, 2015, Bruno Cadonna <
>> [hidden email]
>> > >
>> > wrote:
>> >
>> > > -----BEGIN PGP SIGNED MESSAGE-----
>> > > Hash: SHA1
>> > >
>> > > Hi Gyula,
>> > >
>> > > I have a question regarding your suggestion.
>> > >
>> > > Can the current continuous aggregation be also specified with your
>> > > proposed periodic aggregation?
>> > >
>> > > I am thinking about something like
>> > >
>> > > dataStream.reduce(...).every(Count.of(1))
>> > >
>> > > Cheers,
>> > > Bruno
>> > >
>> > > On 20.04.2015 22:32, Gyula Fóra wrote:
>> > > > Hey all,
>> > > >
>> > > > I think we are missing a quite useful feature that could be
>> > > > implemented (with some slight modifications) on top of the current
>> > > > windowing api.
>> > > >
>> > > > We currently provide 2 ways of aggregating (or reducing) over
>> > > > streams: doing a continuous aggregation and always output the
>> > > > aggregated value (which cannot be done properly in parallel) or
>> > > > doing aggregation in a window periodically.
>> > > >
>> > > > What we don't have at the moment is periodic aggregations on the
>> > > > whole stream. I would even go as far as to remove the continuous
>> > > > outputting reduce/aggregate it and replace it with this version as
>> > > > this in return can be done properly in parallel.
>> > > >
>> > > > My suggestion would be that a call:
>> > > >
>> > > > dataStream.reduce(..) dataStream.sum(..)
>> > > >
>> > > > would return a windowed data stream where the window is the whole
>> > > > record history, and the user would need to define a trigger to get
>> > > > the actual reduced values like:
>> > > >
>> > > > dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
>> > > > reduced results. dataStream.sum(...).every(...)
>> > > >
>> > > > I think the current data stream reduce/aggregation is very
>> > > > confusing without being practical for any normal use-case.
>> > > >
>> > > > Also this would be a very api breaking change (but I would still
>> > > > make this change as it is much more intuitive than the current
>> > > > behaviour) so I would try to push it before the release if we can
>> > > > agree.
>> > > >
>> > > > Cheers, Gyula
>> > > >
>> > >
>> > > - --
>> > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> > >
>> > >   Dr. Bruno Cadonna
>> > >   Postdoctoral Researcher
>> > >
>> > >   Databases and Information Systems
>> > >   Department of Computer Science
>> > >   Humboldt-Universität zu Berlin
>> > >
>> > >   http://www.informatik.hu-berlin.de/~cadonnab
>> > >
>> > > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>> > > -----BEGIN PGP SIGNATURE-----
>> > > Version: GnuPG v1.4.11 (GNU/Linux)
>> > >
>> > > iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
>> > > PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
>> > > g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
>> > > R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
>> > > OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
>> > > gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
>> > > =8bVQ
>> > > -----END PGP SIGNATURE-----
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Gyula,

I read your comments of your PR.

I have a question to this comment:

"It only allows aggregations so we dont need to keep the full history
in a buffer."

What if the user implements an aggregation function like a median?

For a median you need the full history, don't you?

Am I missing something?

Cheers,
Bruno

On 21.04.2015 14:31, Gyula Fóra wrote:

> I have opened a PR for this feature:
>
> https://github.com/apache/flink/pull/614
>
> Cheers, Gyula
>
> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra <[hidden email]>
> wrote:
>
>> Thats a good idea, I will modify my PR to that :)
>>
>> Gyula
>>
>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
>> <[hidden email]> wrote:
>>
>>> Is it possible to switch the order of the statements, i.e.,
>>>
>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
>>> dataStream.reduce(...).every(Time.of(4,sec))
>>>
>>> I think that would be more consistent with the structure of the
>>> remaining API.
>>>
>>> Cheers, Fabian
>>>
>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra <[hidden email]>:
>>>
>>>> Hi Bruno,
>>>>
>>>> Of course you can do that as well. (That's the good part :p
>>>> )
>>>>
>>>> I will open a PR soon with the proposed changes (first
>>>> without breaking
>>> the
>>>> current Api) and I will post it here.
>>>>
>>>> Cheers, Gyula
>>>>
>>>> On Tuesday, April 21, 2015, Bruno Cadonna <
>>> [hidden email]
>>>>>
>>>> wrote:
>>>>
> Hi Gyula,
>
> I have a question regarding your suggestion.
>
> Can the current continuous aggregation be also specified with your
> proposed periodic aggregation?
>
> I am thinking about something like
>
> dataStream.reduce(...).every(Count.of(1))
>
> Cheers, Bruno
>
> On 20.04.2015 22:32, Gyula Fóra wrote:
>>>>>>> Hey all,
>>>>>>>
>>>>>>> I think we are missing a quite useful feature that
>>>>>>> could be implemented (with some slight modifications)
>>>>>>> on top of the current windowing api.
>>>>>>>
>>>>>>> We currently provide 2 ways of aggregating (or
>>>>>>> reducing) over streams: doing a continuous aggregation
>>>>>>> and always output the aggregated value (which cannot be
>>>>>>> done properly in parallel) or doing aggregation in a
>>>>>>> window periodically.
>>>>>>>
>>>>>>> What we don't have at the moment is periodic
>>>>>>> aggregations on the whole stream. I would even go as
>>>>>>> far as to remove the continuous outputting
>>>>>>> reduce/aggregate it and replace it with this version
>>>>>>> as this in return can be done properly in parallel.
>>>>>>>
>>>>>>> My suggestion would be that a call:
>>>>>>>
>>>>>>> dataStream.reduce(..) dataStream.sum(..)
>>>>>>>
>>>>>>> would return a windowed data stream where the window is
>>>>>>> the whole record history, and the user would need to
>>>>>>> define a trigger to get the actual reduced values
>>>>>>> like:
>>>>>>>
>>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to get the
>>>>>>> actual reduced results. dataStream.sum(...).every(...)
>>>>>>>
>>>>>>> I think the current data stream reduce/aggregation is
>>>>>>> very confusing without being practical for any normal
>>>>>>> use-case.
>>>>>>>
>>>>>>> Also this would be a very api breaking change (but I
>>>>>>> would still make this change as it is much more
>>>>>>> intuitive than the current behaviour) so I would try to
>>>>>>> push it before the release if we can agree.
>>>>>>>
>>>>>>> Cheers, Gyula
>>>>>>>
>
>>>>>
>>>>
>>>
>>
>>
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As
bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf
P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM
zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF
sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI
1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew=
=u1R0
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Gyula Fóra
You are right, but you should never try to compute full stream median,
thats the point :D

On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
[hidden email]> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Gyula,
>
> I read your comments of your PR.
>
> I have a question to this comment:
>
> "It only allows aggregations so we dont need to keep the full history
> in a buffer."
>
> What if the user implements an aggregation function like a median?
>
> For a median you need the full history, don't you?
>
> Am I missing something?
>
> Cheers,
> Bruno
>
> On 21.04.2015 14:31, Gyula Fóra wrote:
> > I have opened a PR for this feature:
> >
> > https://github.com/apache/flink/pull/614
> >
> > Cheers, Gyula
> >
> > On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra <[hidden email]>
> > wrote:
> >
> >> Thats a good idea, I will modify my PR to that :)
> >>
> >> Gyula
> >>
> >> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
> >> <[hidden email]> wrote:
> >>
> >>> Is it possible to switch the order of the statements, i.e.,
> >>>
> >>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> >>> dataStream.reduce(...).every(Time.of(4,sec))
> >>>
> >>> I think that would be more consistent with the structure of the
> >>> remaining API.
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra <[hidden email]>:
> >>>
> >>>> Hi Bruno,
> >>>>
> >>>> Of course you can do that as well. (That's the good part :p
> >>>> )
> >>>>
> >>>> I will open a PR soon with the proposed changes (first
> >>>> without breaking
> >>> the
> >>>> current Api) and I will post it here.
> >>>>
> >>>> Cheers, Gyula
> >>>>
> >>>> On Tuesday, April 21, 2015, Bruno Cadonna <
> >>> [hidden email]
> >>>>>
> >>>> wrote:
> >>>>
> > Hi Gyula,
> >
> > I have a question regarding your suggestion.
> >
> > Can the current continuous aggregation be also specified with your
> > proposed periodic aggregation?
> >
> > I am thinking about something like
> >
> > dataStream.reduce(...).every(Count.of(1))
> >
> > Cheers, Bruno
> >
> > On 20.04.2015 22:32, Gyula Fóra wrote:
> >>>>>>> Hey all,
> >>>>>>>
> >>>>>>> I think we are missing a quite useful feature that
> >>>>>>> could be implemented (with some slight modifications)
> >>>>>>> on top of the current windowing api.
> >>>>>>>
> >>>>>>> We currently provide 2 ways of aggregating (or
> >>>>>>> reducing) over streams: doing a continuous aggregation
> >>>>>>> and always output the aggregated value (which cannot be
> >>>>>>> done properly in parallel) or doing aggregation in a
> >>>>>>> window periodically.
> >>>>>>>
> >>>>>>> What we don't have at the moment is periodic
> >>>>>>> aggregations on the whole stream. I would even go as
> >>>>>>> far as to remove the continuous outputting
> >>>>>>> reduce/aggregate it and replace it with this version
> >>>>>>> as this in return can be done properly in parallel.
> >>>>>>>
> >>>>>>> My suggestion would be that a call:
> >>>>>>>
> >>>>>>> dataStream.reduce(..) dataStream.sum(..)
> >>>>>>>
> >>>>>>> would return a windowed data stream where the window is
> >>>>>>> the whole record history, and the user would need to
> >>>>>>> define a trigger to get the actual reduced values
> >>>>>>> like:
> >>>>>>>
> >>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to get the
> >>>>>>> actual reduced results. dataStream.sum(...).every(...)
> >>>>>>>
> >>>>>>> I think the current data stream reduce/aggregation is
> >>>>>>> very confusing without being practical for any normal
> >>>>>>> use-case.
> >>>>>>>
> >>>>>>> Also this would be a very api breaking change (but I
> >>>>>>> would still make this change as it is much more
> >>>>>>> intuitive than the current behaviour) so I would try to
> >>>>>>> push it before the release if we can agree.
> >>>>>>>
> >>>>>>> Cheers, Gyula
> >>>>>>>
> >
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVNkgYAAoJEKdCIJx7flKw7rUIAMmu80ZuMvfA/BvQemkEo7As
> bU3iWre+e3OUWNRLuf2JfG9CHMKFSjBJG6Jax/pWZBXTYh8oaYDrYixq7e+vljqf
> P9ypurhd1h8In71aSUyUPIsrTg6aJ5xo/beUxA6LFbB2LpVqawNDe0gjn3ZRMobM
> zmn962kqp0oHAVipYI2mzEU6RNl1Kh0PoaLaZRLRh+dlgKofqDFcBiB3hhG/VEoF
> sCsCAsC1bXtpToPRZ29cRcEfpHcnE3zCgivPeG83JsWYr4mIEj7gp+smFUz0PjoI
> 1wHv/pnZJS4Onk38HH1GcP95/uYpqm4gz3OBCuE7v+3b1bI852bIvnUZrCGLOew=
> =u1R0
> -----END PGP SIGNATURE-----
>
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Gyula,

fair enough!

I used a bad example.

What I really wanted to know is whether your code supports only
aggregation like sum, min, and max where you need to pass only a value
to the next aggregation or also more complex data structures, e.g., a
synopsis of the full stream, to compute an aggregation such as an
approximate count distinct (item count)?

Cheers,
Bruno

On 21.04.2015 15:18, Gyula Fóra wrote:

> You are right, but you should never try to compute full stream
> median, thats the point :D
>
> On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
> [hidden email]> wrote:
>
> Hi Gyula,
>
> I read your comments of your PR.
>
> I have a question to this comment:
>
> "It only allows aggregations so we dont need to keep the full
> history in a buffer."
>
> What if the user implements an aggregation function like a median?
>
> For a median you need the full history, don't you?
>
> Am I missing something?
>
> Cheers, Bruno
>
> On 21.04.2015 14:31, Gyula Fóra wrote:
>>>> I have opened a PR for this feature:
>>>>
>>>> https://github.com/apache/flink/pull/614
>>>>
>>>> Cheers, Gyula
>>>>
>>>> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
>>>> <[hidden email]> wrote:
>>>>
>>>>> Thats a good idea, I will modify my PR to that :)
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
>>>>> <[hidden email]> wrote:
>>>>>
>>>>>> Is it possible to switch the order of the statements,
>>>>>> i.e.,
>>>>>>
>>>>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
>>>>>> dataStream.reduce(...).every(Time.of(4,sec))
>>>>>>
>>>>>> I think that would be more consistent with the structure
>>>>>> of the remaining API.
>>>>>>
>>>>>> Cheers, Fabian
>>>>>>
>>>>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra
>>>>>> <[hidden email]>:
>>>>>>
>>>>>>> Hi Bruno,
>>>>>>>
>>>>>>> Of course you can do that as well. (That's the good
>>>>>>> part :p )
>>>>>>>
>>>>>>> I will open a PR soon with the proposed changes (first
>>>>>>> without breaking
>>>>>> the
>>>>>>> current Api) and I will post it here.
>>>>>>>
>>>>>>> Cheers, Gyula
>>>>>>>
>>>>>>> On Tuesday, April 21, 2015, Bruno Cadonna <
>>>>>> [hidden email]
>>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>> Hi Gyula,
>>>>
>>>> I have a question regarding your suggestion.
>>>>
>>>> Can the current continuous aggregation be also specified with
>>>> your proposed periodic aggregation?
>>>>
>>>> I am thinking about something like
>>>>
>>>> dataStream.reduce(...).every(Count.of(1))
>>>>
>>>> Cheers, Bruno
>>>>
>>>> On 20.04.2015 22:32, Gyula Fóra wrote:
>>>>>>>>>> Hey all,
>>>>>>>>>>
>>>>>>>>>> I think we are missing a quite useful feature
>>>>>>>>>> that could be implemented (with some slight
>>>>>>>>>> modifications) on top of the current windowing
>>>>>>>>>> api.
>>>>>>>>>>
>>>>>>>>>> We currently provide 2 ways of aggregating (or
>>>>>>>>>> reducing) over streams: doing a continuous
>>>>>>>>>> aggregation and always output the aggregated
>>>>>>>>>> value (which cannot be done properly in parallel)
>>>>>>>>>> or doing aggregation in a window periodically.
>>>>>>>>>>
>>>>>>>>>> What we don't have at the moment is periodic
>>>>>>>>>> aggregations on the whole stream. I would even go
>>>>>>>>>> as far as to remove the continuous outputting
>>>>>>>>>> reduce/aggregate it and replace it with this
>>>>>>>>>> version as this in return can be done properly in
>>>>>>>>>> parallel.
>>>>>>>>>>
>>>>>>>>>> My suggestion would be that a call:
>>>>>>>>>>
>>>>>>>>>> dataStream.reduce(..) dataStream.sum(..)
>>>>>>>>>>
>>>>>>>>>> would return a windowed data stream where the
>>>>>>>>>> window is the whole record history, and the user
>>>>>>>>>> would need to define a trigger to get the actual
>>>>>>>>>> reduced values like:
>>>>>>>>>>
>>>>>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to
>>>>>>>>>> get the actual reduced results.
>>>>>>>>>> dataStream.sum(...).every(...)
>>>>>>>>>>
>>>>>>>>>> I think the current data stream
>>>>>>>>>> reduce/aggregation is very confusing without
>>>>>>>>>> being practical for any normal use-case.
>>>>>>>>>>
>>>>>>>>>> Also this would be a very api breaking change
>>>>>>>>>> (but I would still make this change as it is much
>>>>>>>>>> more intuitive than the current behaviour) so I
>>>>>>>>>> would try to push it before the release if we can
>>>>>>>>>> agree.
>>>>>>>>>>
>>>>>>>>>> Cheers, Gyula
>>>>>>>>>>
>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>
>>
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1

iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC
kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq
gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp
jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU=
=bhGt
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Periodic full stream aggregations

Gyula Fóra
Hey,

The current code supports 2 types of aggregations, simple binary reduce:
T,T=>T and also the grouped version for this, where the reduce function is
applied per a user defined key (so there we keep a map of reduced values).
This can already be used to implement fairly complex logic if we transform
the data to a proper type before passing it to the reducer.

As a next step we can make this work with fold + combiners as well, where
your initial data type is T and your fould function is T,R => R and a
combiner is R,R => R.

At that point I think any sensible aggregation can be implemented.

Regards,
Gyula


On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna <
[hidden email]> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Gyula,
>
> fair enough!
>
> I used a bad example.
>
> What I really wanted to know is whether your code supports only
> aggregation like sum, min, and max where you need to pass only a value
> to the next aggregation or also more complex data structures, e.g., a
> synopsis of the full stream, to compute an aggregation such as an
> approximate count distinct (item count)?
>
> Cheers,
> Bruno
>
> On 21.04.2015 15:18, Gyula Fóra wrote:
> > You are right, but you should never try to compute full stream
> > median, thats the point :D
> >
> > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna <
> > [hidden email]> wrote:
> >
> > Hi Gyula,
> >
> > I read your comments of your PR.
> >
> > I have a question to this comment:
> >
> > "It only allows aggregations so we dont need to keep the full
> > history in a buffer."
> >
> > What if the user implements an aggregation function like a median?
> >
> > For a median you need the full history, don't you?
> >
> > Am I missing something?
> >
> > Cheers, Bruno
> >
> > On 21.04.2015 14:31, Gyula Fóra wrote:
> >>>> I have opened a PR for this feature:
> >>>>
> >>>> https://github.com/apache/flink/pull/614
> >>>>
> >>>> Cheers, Gyula
> >>>>
> >>>> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
> >>>> <[hidden email]> wrote:
> >>>>
> >>>>> Thats a good idea, I will modify my PR to that :)
> >>>>>
> >>>>> Gyula
> >>>>>
> >>>>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
> >>>>> <[hidden email]> wrote:
> >>>>>
> >>>>>> Is it possible to switch the order of the statements,
> >>>>>> i.e.,
> >>>>>>
> >>>>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of
> >>>>>> dataStream.reduce(...).every(Time.of(4,sec))
> >>>>>>
> >>>>>> I think that would be more consistent with the structure
> >>>>>> of the remaining API.
> >>>>>>
> >>>>>> Cheers, Fabian
> >>>>>>
> >>>>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra
> >>>>>> <[hidden email]>:
> >>>>>>
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> Of course you can do that as well. (That's the good
> >>>>>>> part :p )
> >>>>>>>
> >>>>>>> I will open a PR soon with the proposed changes (first
> >>>>>>> without breaking
> >>>>>> the
> >>>>>>> current Api) and I will post it here.
> >>>>>>>
> >>>>>>> Cheers, Gyula
> >>>>>>>
> >>>>>>> On Tuesday, April 21, 2015, Bruno Cadonna <
> >>>>>> [hidden email]
> >>>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>> Hi Gyula,
> >>>>
> >>>> I have a question regarding your suggestion.
> >>>>
> >>>> Can the current continuous aggregation be also specified with
> >>>> your proposed periodic aggregation?
> >>>>
> >>>> I am thinking about something like
> >>>>
> >>>> dataStream.reduce(...).every(Count.of(1))
> >>>>
> >>>> Cheers, Bruno
> >>>>
> >>>> On 20.04.2015 22:32, Gyula Fóra wrote:
> >>>>>>>>>> Hey all,
> >>>>>>>>>>
> >>>>>>>>>> I think we are missing a quite useful feature
> >>>>>>>>>> that could be implemented (with some slight
> >>>>>>>>>> modifications) on top of the current windowing
> >>>>>>>>>> api.
> >>>>>>>>>>
> >>>>>>>>>> We currently provide 2 ways of aggregating (or
> >>>>>>>>>> reducing) over streams: doing a continuous
> >>>>>>>>>> aggregation and always output the aggregated
> >>>>>>>>>> value (which cannot be done properly in parallel)
> >>>>>>>>>> or doing aggregation in a window periodically.
> >>>>>>>>>>
> >>>>>>>>>> What we don't have at the moment is periodic
> >>>>>>>>>> aggregations on the whole stream. I would even go
> >>>>>>>>>> as far as to remove the continuous outputting
> >>>>>>>>>> reduce/aggregate it and replace it with this
> >>>>>>>>>> version as this in return can be done properly in
> >>>>>>>>>> parallel.
> >>>>>>>>>>
> >>>>>>>>>> My suggestion would be that a call:
> >>>>>>>>>>
> >>>>>>>>>> dataStream.reduce(..) dataStream.sum(..)
> >>>>>>>>>>
> >>>>>>>>>> would return a windowed data stream where the
> >>>>>>>>>> window is the whole record history, and the user
> >>>>>>>>>> would need to define a trigger to get the actual
> >>>>>>>>>> reduced values like:
> >>>>>>>>>>
> >>>>>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to
> >>>>>>>>>> get the actual reduced results.
> >>>>>>>>>> dataStream.sum(...).every(...)
> >>>>>>>>>>
> >>>>>>>>>> I think the current data stream
> >>>>>>>>>> reduce/aggregation is very confusing without
> >>>>>>>>>> being practical for any normal use-case.
> >>>>>>>>>>
> >>>>>>>>>> Also this would be a very api breaking change
> >>>>>>>>>> (but I would still make this change as it is much
> >>>>>>>>>> more intuitive than the current behaviour) so I
> >>>>>>>>>> would try to push it before the release if we can
> >>>>>>>>>> agree.
> >>>>>>>>>>
> >>>>>>>>>> Cheers, Gyula
> >>>>>>>>>>
> >>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >
> >>
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1
>
> iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
> jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
> IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC
> kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq
> gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp
> jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU=
> =bhGt
> -----END PGP SIGNATURE-----
>