Adding a new operator

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Adding a new operator

Andra Lungu
Hey guys,

I am trying to add a new runtime operator;
To this end, I am following the guide here:
http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
and the code itself.


From what I understood, the run() in ReduceDriver, for instance, should be
called every time a reduce() is called. However, I added a breakpoint in
ReduceDriver's run method on the first if and called reduce() on a DataSet.
When debugging, it seems that the method is not called; I also tried adding
a log.info() there. That doesn't get printed either... Obviously, the same
goes for System.out.println.

Could someone explain the workflow a bit better? When exactly does run()
get called and what is ReduceDriver's role?

Thanks!
Andra
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Markus Holzemer
Hey Andrea,
perhaps you are looking at the wrong ReduceDriver?
As you can see in the DriverStrategy enum there is several different
ReduceDrivers depending on the strategy the optimizer chooses.

best,
Markus

2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:

> Hey guys,
>
> I am trying to add a new runtime operator;
> To this end, I am following the guide here:
>
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> and the code itself.
>
>
> From what I understood, the run() in ReduceDriver, for instance, should be
> called every time a reduce() is called. However, I added a breakpoint in
> ReduceDriver's run method on the first if and called reduce() on a DataSet.
> When debugging, it seems that the method is not called; I also tried adding
> a log.info() there. That doesn't get printed either... Obviously, the same
> goes for System.out.println.
>
> Could someone explain the workflow a bit better? When exactly does run()
> get called and what is ReduceDriver's role?
>
> Thanks!
> Andra
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Andra Lungu
Yes Markus,

ds.reduce() -> AllReduceDriver
ds.groupBy().reduce() -> ReduceDriver

It's very intuitive ;)

On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
[hidden email]> wrote:

> Hey Andrea,
> perhaps you are looking at the wrong ReduceDriver?
> As you can see in the DriverStrategy enum there is several different
> ReduceDrivers depending on the strategy the optimizer chooses.
>
> best,
> Markus
>
> 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
>
> > Hey guys,
> >
> > I am trying to add a new runtime operator;
> > To this end, I am following the guide here:
> >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > and the code itself.
> >
> >
> > From what I understood, the run() in ReduceDriver, for instance, should
> be
> > called every time a reduce() is called. However, I added a breakpoint in
> > ReduceDriver's run method on the first if and called reduce() on a
> DataSet.
> > When debugging, it seems that the method is not called; I also tried
> adding
> > a log.info() there. That doesn't get printed either... Obviously, the
> same
> > goes for System.out.println.
> >
> > Could someone explain the workflow a bit better? When exactly does run()
> > get called and what is ReduceDriver's role?
> >
> > Thanks!
> > Andra
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Fabian Hueske-2
Hi Andra,

is there a JIRA for the new runtime operator?

Adding a new operator is a lot of work and touches many core parts of the
system.
It would be good to start a discussion about that early in the process to
make sure that the design is aligned with the system.
Otherwise, duplicated work might be necessary before it can be added to the
system.

Cheers,
Fabian

2015-04-26 13:05 GMT+02:00 Andra Lungu <[hidden email]>:

> Yes Markus,
>
> ds.reduce() -> AllReduceDriver
> ds.groupBy().reduce() -> ReduceDriver
>
> It's very intuitive ;)
>
> On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> [hidden email]> wrote:
>
> > Hey Andrea,
> > perhaps you are looking at the wrong ReduceDriver?
> > As you can see in the DriverStrategy enum there is several different
> > ReduceDrivers depending on the strategy the optimizer chooses.
> >
> > best,
> > Markus
> >
> > 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
> >
> > > Hey guys,
> > >
> > > I am trying to add a new runtime operator;
> > > To this end, I am following the guide here:
> > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > and the code itself.
> > >
> > >
> > > From what I understood, the run() in ReduceDriver, for instance, should
> > be
> > > called every time a reduce() is called. However, I added a breakpoint
> in
> > > ReduceDriver's run method on the first if and called reduce() on a
> > DataSet.
> > > When debugging, it seems that the method is not called; I also tried
> > adding
> > > a log.info() there. That doesn't get printed either... Obviously, the
> > same
> > > goes for System.out.println.
> > >
> > > Could someone explain the workflow a bit better? When exactly does
> run()
> > > get called and what is ReduceDriver's role?
> > >
> > > Thanks!
> > > Andra
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Vasiliki Kalavri
Hi,

Andra is working on a modified reduce operator, which would internally
create an aggregation tree.
This is work related to her thesis and we want to use it in graph
computations for skewed inputs.
It might or might not be a good idea to add it as a Flink operator and we
will need to evaluate that (as part of the thesis), so we don't have a JIRA
for this :-)

-Vasia.

On 27 April 2015 at 10:20, Fabian Hueske <[hidden email]> wrote:

> Hi Andra,
>
> is there a JIRA for the new runtime operator?
>
> Adding a new operator is a lot of work and touches many core parts of the
> system.
> It would be good to start a discussion about that early in the process to
> make sure that the design is aligned with the system.
> Otherwise, duplicated work might be necessary before it can be added to the
> system.
>
> Cheers,
> Fabian
>
> 2015-04-26 13:05 GMT+02:00 Andra Lungu <[hidden email]>:
>
> > Yes Markus,
> >
> > ds.reduce() -> AllReduceDriver
> > ds.groupBy().reduce() -> ReduceDriver
> >
> > It's very intuitive ;)
> >
> > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > [hidden email]> wrote:
> >
> > > Hey Andrea,
> > > perhaps you are looking at the wrong ReduceDriver?
> > > As you can see in the DriverStrategy enum there is several different
> > > ReduceDrivers depending on the strategy the optimizer chooses.
> > >
> > > best,
> > > Markus
> > >
> > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
> > >
> > > > Hey guys,
> > > >
> > > > I am trying to add a new runtime operator;
> > > > To this end, I am following the guide here:
> > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > and the code itself.
> > > >
> > > >
> > > > From what I understood, the run() in ReduceDriver, for instance,
> should
> > > be
> > > > called every time a reduce() is called. However, I added a breakpoint
> > in
> > > > ReduceDriver's run method on the first if and called reduce() on a
> > > DataSet.
> > > > When debugging, it seems that the method is not called; I also tried
> > > adding
> > > > a log.info() there. That doesn't get printed either... Obviously,
> the
> > > same
> > > > goes for System.out.println.
> > > >
> > > > Could someone explain the workflow a bit better? When exactly does
> > run()
> > > > get called and what is ReduceDriver's role?
> > > >
> > > > Thanks!
> > > > Andra
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Andra Lungu
Hi Fabian,

After a quick look at the current behaviour of Flink's combinable reduce, I
saw that it does something like this:
https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing

It basically iterates over all the key groups two by two and if it finds
two keys within the same key group, it reduces them.

What we would like to do(as Vasia said) as part of my thesis would be to
speed up the regular reduce by turning it into a treeReduce() where
appropriate[in GSA, for example, we do a reduce; that could easily be
hot-swapped with treeReduce() when skewed vertices are detected]. This new
operator will get the number of levels as an additional parameter(val1,
val2, numLevels) and will aggregate in levels:
https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing

The goal is to make computation for highly skewed graphs scale. If we map
one of the first nodes in the drawing to a vertex with high in-degree, it
will slow down computation with the first reduce approach. But I am sure we
could find many other use cases.

Now, in order to write the treeReduce operator, I made some investigations.
It does not suffice to make reduce's run() method operate on levels, you
also need to ensure that the partial reduces in the levels are executed on
different machines. This is where the tricky and fun part begins. How do
you know which reduce is executed on which machine? In which class is this
described?

I sure would hate to do duplicate work and since this is the first time I
had a look at Flink's internals, I could also use some  guidance.




On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri <
[hidden email]> wrote:

> Hi,
>
> Andra is working on a modified reduce operator, which would internally
> create an aggregation tree.
> This is work related to her thesis and we want to use it in graph
> computations for skewed inputs.
> It might or might not be a good idea to add it as a Flink operator and we
> will need to evaluate that (as part of the thesis), so we don't have a JIRA
> for this :-)
>
> -Vasia.
>
> On 27 April 2015 at 10:20, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Andra,
> >
> > is there a JIRA for the new runtime operator?
> >
> > Adding a new operator is a lot of work and touches many core parts of the
> > system.
> > It would be good to start a discussion about that early in the process to
> > make sure that the design is aligned with the system.
> > Otherwise, duplicated work might be necessary before it can be added to
> the
> > system.
> >
> > Cheers,
> > Fabian
> >
> > 2015-04-26 13:05 GMT+02:00 Andra Lungu <[hidden email]>:
> >
> > > Yes Markus,
> > >
> > > ds.reduce() -> AllReduceDriver
> > > ds.groupBy().reduce() -> ReduceDriver
> > >
> > > It's very intuitive ;)
> > >
> > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > > [hidden email]> wrote:
> > >
> > > > Hey Andrea,
> > > > perhaps you are looking at the wrong ReduceDriver?
> > > > As you can see in the DriverStrategy enum there is several different
> > > > ReduceDrivers depending on the strategy the optimizer chooses.
> > > >
> > > > best,
> > > > Markus
> > > >
> > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I am trying to add a new runtime operator;
> > > > > To this end, I am following the guide here:
> > > > >
> > > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > > and the code itself.
> > > > >
> > > > >
> > > > > From what I understood, the run() in ReduceDriver, for instance,
> > should
> > > > be
> > > > > called every time a reduce() is called. However, I added a
> breakpoint
> > > in
> > > > > ReduceDriver's run method on the first if and called reduce() on a
> > > > DataSet.
> > > > > When debugging, it seems that the method is not called; I also
> tried
> > > > adding
> > > > > a log.info() there. That doesn't get printed either... Obviously,
> > the
> > > > same
> > > > > goes for System.out.println.
> > > > >
> > > > > Could someone explain the workflow a bit better? When exactly does
> > > run()
> > > > > get called and what is ReduceDriver's role?
> > > > >
> > > > > Thanks!
> > > > > Andra
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Kostas Tzoumas-2
Some form of tree aggregation is useful in many cases, and IMO a good
addition to the system.

Kostas

On Mon, Apr 27, 2015 at 11:04 AM, Andra Lungu <[hidden email]> wrote:

> Hi Fabian,
>
> After a quick look at the current behaviour of Flink's combinable reduce, I
> saw that it does something like this:
>
> https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing
>
> It basically iterates over all the key groups two by two and if it finds
> two keys within the same key group, it reduces them.
>
> What we would like to do(as Vasia said) as part of my thesis would be to
> speed up the regular reduce by turning it into a treeReduce() where
> appropriate[in GSA, for example, we do a reduce; that could easily be
> hot-swapped with treeReduce() when skewed vertices are detected]. This new
> operator will get the number of levels as an additional parameter(val1,
> val2, numLevels) and will aggregate in levels:
>
> https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing
>
> The goal is to make computation for highly skewed graphs scale. If we map
> one of the first nodes in the drawing to a vertex with high in-degree, it
> will slow down computation with the first reduce approach. But I am sure we
> could find many other use cases.
>
> Now, in order to write the treeReduce operator, I made some investigations.
> It does not suffice to make reduce's run() method operate on levels, you
> also need to ensure that the partial reduces in the levels are executed on
> different machines. This is where the tricky and fun part begins. How do
> you know which reduce is executed on which machine? In which class is this
> described?
>
> I sure would hate to do duplicate work and since this is the first time I
> had a look at Flink's internals, I could also use some  guidance.
>
>
>
>
> On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri <
> [hidden email]> wrote:
>
> > Hi,
> >
> > Andra is working on a modified reduce operator, which would internally
> > create an aggregation tree.
> > This is work related to her thesis and we want to use it in graph
> > computations for skewed inputs.
> > It might or might not be a good idea to add it as a Flink operator and we
> > will need to evaluate that (as part of the thesis), so we don't have a
> JIRA
> > for this :-)
> >
> > -Vasia.
> >
> > On 27 April 2015 at 10:20, Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Andra,
> > >
> > > is there a JIRA for the new runtime operator?
> > >
> > > Adding a new operator is a lot of work and touches many core parts of
> the
> > > system.
> > > It would be good to start a discussion about that early in the process
> to
> > > make sure that the design is aligned with the system.
> > > Otherwise, duplicated work might be necessary before it can be added to
> > the
> > > system.
> > >
> > > Cheers,
> > > Fabian
> > >
> > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <[hidden email]>:
> > >
> > > > Yes Markus,
> > > >
> > > > ds.reduce() -> AllReduceDriver
> > > > ds.groupBy().reduce() -> ReduceDriver
> > > >
> > > > It's very intuitive ;)
> > > >
> > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > > > [hidden email]> wrote:
> > > >
> > > > > Hey Andrea,
> > > > > perhaps you are looking at the wrong ReduceDriver?
> > > > > As you can see in the DriverStrategy enum there is several
> different
> > > > > ReduceDrivers depending on the strategy the optimizer chooses.
> > > > >
> > > > > best,
> > > > > Markus
> > > > >
> > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
> > > > >
> > > > > > Hey guys,
> > > > > >
> > > > > > I am trying to add a new runtime operator;
> > > > > > To this end, I am following the guide here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > > > and the code itself.
> > > > > >
> > > > > >
> > > > > > From what I understood, the run() in ReduceDriver, for instance,
> > > should
> > > > > be
> > > > > > called every time a reduce() is called. However, I added a
> > breakpoint
> > > > in
> > > > > > ReduceDriver's run method on the first if and called reduce() on
> a
> > > > > DataSet.
> > > > > > When debugging, it seems that the method is not called; I also
> > tried
> > > > > adding
> > > > > > a log.info() there. That doesn't get printed either...
> Obviously,
> > > the
> > > > > same
> > > > > > goes for System.out.println.
> > > > > >
> > > > > > Could someone explain the workflow a bit better? When exactly
> does
> > > > run()
> > > > > > get called and what is ReduceDriver's role?
> > > > > >
> > > > > > Thanks!
> > > > > > Andra
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a new operator

Fabian Hueske-2
I am not sure if I got everything right.

Let me first explain how a Reduce operator is executed in Flink at the
moment:

Assume a data set is randomly distributed across four machines and not
sorted.
When a Reduce function is applied on this data set, each of the four
machines run a Combine operator (the operator applies the Reduce function).
The combine operator will receive a certain amount of memory, say 256MB.
This memory will be filled with a subset of the data. The subset is sorted
in memory on the Reduce operator's grouping key and afterwards forwarded to
the Reduce function which pair-wise processes all elements with the same
key. For each unique grouping key in the memory buffer, there will be only
one result element. The memory is filled again, sorted, and combined until
all local data was combined. The all resulting elements are
hash-partitioned and sent to the corresponding node. Once this is done, all
elements with the same grouping key are on the same machine. The data is
now fully sorted (not only the data within a single memory buffer) and
again pair-wise reduced.
Your first picture seems to shows the local pair-wise reduce on sorted
data. If I understood it correctly, the second picture is on a different
level of abstraction, showing distributed execution of a tree-reduce.

For highly skewed data, it makes sense to add one or more combine step
before doing the final full reduce (I believe, this is was Kostas is
referring to).
The semantics of such a tree-combined reduce are the same as for the
regular reduce. It would just be executed in a different way. Therefore,
the function interface should remain the same and there is no need to add
an API operator for that (except maybe a hint to the optimizer to enforce a
tree-combine execution strategy).

Is this what you are looking for or do you need a new API operator with
different semantics?

Best, Fabian

2015-04-27 11:13 GMT+02:00 Kostas Tzoumas <[hidden email]>:

> Some form of tree aggregation is useful in many cases, and IMO a good
> addition to the system.
>
> Kostas
>
> On Mon, Apr 27, 2015 at 11:04 AM, Andra Lungu <[hidden email]>
> wrote:
>
> > Hi Fabian,
> >
> > After a quick look at the current behaviour of Flink's combinable
> reduce, I
> > saw that it does something like this:
> >
> >
> https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing
> >
> > It basically iterates over all the key groups two by two and if it finds
> > two keys within the same key group, it reduces them.
> >
> > What we would like to do(as Vasia said) as part of my thesis would be to
> > speed up the regular reduce by turning it into a treeReduce() where
> > appropriate[in GSA, for example, we do a reduce; that could easily be
> > hot-swapped with treeReduce() when skewed vertices are detected]. This
> new
> > operator will get the number of levels as an additional parameter(val1,
> > val2, numLevels) and will aggregate in levels:
> >
> >
> https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing
> >
> > The goal is to make computation for highly skewed graphs scale. If we map
> > one of the first nodes in the drawing to a vertex with high in-degree, it
> > will slow down computation with the first reduce approach. But I am sure
> we
> > could find many other use cases.
> >
> > Now, in order to write the treeReduce operator, I made some
> investigations.
> > It does not suffice to make reduce's run() method operate on levels, you
> > also need to ensure that the partial reduces in the levels are executed
> on
> > different machines. This is where the tricky and fun part begins. How do
> > you know which reduce is executed on which machine? In which class is
> this
> > described?
> >
> > I sure would hate to do duplicate work and since this is the first time I
> > had a look at Flink's internals, I could also use some  guidance.
> >
> >
> >
> >
> > On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri <
> > [hidden email]> wrote:
> >
> > > Hi,
> > >
> > > Andra is working on a modified reduce operator, which would internally
> > > create an aggregation tree.
> > > This is work related to her thesis and we want to use it in graph
> > > computations for skewed inputs.
> > > It might or might not be a good idea to add it as a Flink operator and
> we
> > > will need to evaluate that (as part of the thesis), so we don't have a
> > JIRA
> > > for this :-)
> > >
> > > -Vasia.
> > >
> > > On 27 April 2015 at 10:20, Fabian Hueske <[hidden email]> wrote:
> > >
> > > > Hi Andra,
> > > >
> > > > is there a JIRA for the new runtime operator?
> > > >
> > > > Adding a new operator is a lot of work and touches many core parts of
> > the
> > > > system.
> > > > It would be good to start a discussion about that early in the
> process
> > to
> > > > make sure that the design is aligned with the system.
> > > > Otherwise, duplicated work might be necessary before it can be added
> to
> > > the
> > > > system.
> > > >
> > > > Cheers,
> > > > Fabian
> > > >
> > > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <[hidden email]>:
> > > >
> > > > > Yes Markus,
> > > > >
> > > > > ds.reduce() -> AllReduceDriver
> > > > > ds.groupBy().reduce() -> ReduceDriver
> > > > >
> > > > > It's very intuitive ;)
> > > > >
> > > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > > > > [hidden email]> wrote:
> > > > >
> > > > > > Hey Andrea,
> > > > > > perhaps you are looking at the wrong ReduceDriver?
> > > > > > As you can see in the DriverStrategy enum there is several
> > different
> > > > > > ReduceDrivers depending on the strategy the optimizer chooses.
> > > > > >
> > > > > > best,
> > > > > > Markus
> > > > > >
> > > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <[hidden email]>:
> > > > > >
> > > > > > > Hey guys,
> > > > > > >
> > > > > > > I am trying to add a new runtime operator;
> > > > > > > To this end, I am following the guide here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > > > > and the code itself.
> > > > > > >
> > > > > > >
> > > > > > > From what I understood, the run() in ReduceDriver, for
> instance,
> > > > should
> > > > > > be
> > > > > > > called every time a reduce() is called. However, I added a
> > > breakpoint
> > > > > in
> > > > > > > ReduceDriver's run method on the first if and called reduce()
> on
> > a
> > > > > > DataSet.
> > > > > > > When debugging, it seems that the method is not called; I also
> > > tried
> > > > > > adding
> > > > > > > a log.info() there. That doesn't get printed either...
> > Obviously,
> > > > the
> > > > > > same
> > > > > > > goes for System.out.println.
> > > > > > >
> > > > > > > Could someone explain the workflow a bit better? When exactly
> > does
> > > > > run()
> > > > > > > get called and what is ReduceDriver's role?
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Andra
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>