API behavior with data sinks (lazy) and eager operations

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

API behavior with data sinks (lazy) and eager operations

Stephan Ewen
Hi there!

With the upcoming more interactive extensions to the API (operations that
go back to the client from a program and need to be eagerly evaluated) we
need to define how different actions should behave.

Currently, nothing gets executed until the "env.execute()" call is made.
That allows to produce multiple data sources at the same time, which is a
good feature.

For certain operations, like the "count()" and "collect()" functions added
in https://github.com/apache/flink/pull/210 , we need to trigger execution
immediately.

The open question is, how should this behave in connection with already
defined data sinks:

1) Should all yet defined data sinks be executed as well?
2) Should only that immediate operation be executed and the data sinks be
pending till a call to "env.execute()"

I am somewhat leaning towards the first option right now, because I think
that executing them later may force re-execution of larger parts of the
plan.

In addition: I think that the "print()" commands should go to the client
command line. In that sense, they would behave like
"collect().foreach(print)"


Greetings,
Stephan
Reply | Threaded
Open this post in threaded view
|

Re: API behavior with data sinks (lazy) and eager operations

Robert Metzger
I would also execute the sinks immediately. I think its a corner case
because the sinks are usually the last thing in a plan and all print() or
collect() statements are earlier in the plan.

print() should go to the client command line, yes.

On Mon, Jan 19, 2015 at 1:42 AM, Stephan Ewen <[hidden email]> wrote:

> Hi there!
>
> With the upcoming more interactive extensions to the API (operations that
> go back to the client from a program and need to be eagerly evaluated) we
> need to define how different actions should behave.
>
> Currently, nothing gets executed until the "env.execute()" call is made.
> That allows to produce multiple data sources at the same time, which is a
> good feature.
>
> For certain operations, like the "count()" and "collect()" functions added
> in https://github.com/apache/flink/pull/210 , we need to trigger execution
> immediately.
>
> The open question is, how should this behave in connection with already
> defined data sinks:
>
> 1) Should all yet defined data sinks be executed as well?
> 2) Should only that immediate operation be executed and the data sinks be
> pending till a call to "env.execute()"
>
> I am somewhat leaning towards the first option right now, because I think
> that executing them later may force re-execution of larger parts of the
> plan.
>
> In addition: I think that the "print()" commands should go to the client
> command line. In that sense, they would behave like
> "collect().foreach(print)"
>
>
> Greetings,
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: API behavior with data sinks (lazy) and eager operations

Ufuk Celebi-2
I think this question depends on how much both subgraphs overlap? But in
general, I agree that the first approach seems more desirable from the
runtime view (multiple consumers at the branch point).

On Mon, Jan 19, 2015 at 10:59 AM, Robert Metzger <[hidden email]>
wrote:

> I would also execute the sinks immediately. I think its a corner case
> because the sinks are usually the last thing in a plan and all print() or
> collect() statements are earlier in the plan.
>
> print() should go to the client command line, yes.
>
> On Mon, Jan 19, 2015 at 1:42 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi there!
> >
> > With the upcoming more interactive extensions to the API (operations that
> > go back to the client from a program and need to be eagerly evaluated) we
> > need to define how different actions should behave.
> >
> > Currently, nothing gets executed until the "env.execute()" call is made.
> > That allows to produce multiple data sources at the same time, which is a
> > good feature.
> >
> > For certain operations, like the "count()" and "collect()" functions
> added
> > in https://github.com/apache/flink/pull/210 , we need to trigger
> execution
> > immediately.
> >
> > The open question is, how should this behave in connection with already
> > defined data sinks:
> >
> > 1) Should all yet defined data sinks be executed as well?
> > 2) Should only that immediate operation be executed and the data sinks be
> > pending till a call to "env.execute()"
> >
> > I am somewhat leaning towards the first option right now, because I think
> > that executing them later may force re-execution of larger parts of the
> > plan.
> >
> > In addition: I think that the "print()" commands should go to the client
> > command line. In that sense, they would behave like
> > "collect().foreach(print)"
> >
> >
> > Greetings,
> > Stephan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: API behavior with data sinks (lazy) and eager operations

Till Rohrmann
I agree with Ufuk that it depends on how much both subgraphs and also
future subgraphs overlap. It is conceivable that the user will reuse
subgraphs of an already computed data sink after he called collect(). Then
we also would have to reexecute parts of the dataflow graph. I guess we
easily find examples supporting both cases. But of course, once we have
checkpointing, we can manually checkpoint the future branching points.

But I'm also leaning more towards 1).

On Mon, Jan 19, 2015 at 11:19 AM, Ufuk Celebi <[hidden email]> wrote:

> I think this question depends on how much both subgraphs overlap? But in
> general, I agree that the first approach seems more desirable from the
> runtime view (multiple consumers at the branch point).
>
> On Mon, Jan 19, 2015 at 10:59 AM, Robert Metzger <[hidden email]>
> wrote:
>
> > I would also execute the sinks immediately. I think its a corner case
> > because the sinks are usually the last thing in a plan and all print() or
> > collect() statements are earlier in the plan.
> >
> > print() should go to the client command line, yes.
> >
> > On Mon, Jan 19, 2015 at 1:42 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi there!
> > >
> > > With the upcoming more interactive extensions to the API (operations
> that
> > > go back to the client from a program and need to be eagerly evaluated)
> we
> > > need to define how different actions should behave.
> > >
> > > Currently, nothing gets executed until the "env.execute()" call is
> made.
> > > That allows to produce multiple data sources at the same time, which
> is a
> > > good feature.
> > >
> > > For certain operations, like the "count()" and "collect()" functions
> > added
> > > in https://github.com/apache/flink/pull/210 , we need to trigger
> > execution
> > > immediately.
> > >
> > > The open question is, how should this behave in connection with already
> > > defined data sinks:
> > >
> > > 1) Should all yet defined data sinks be executed as well?
> > > 2) Should only that immediate operation be executed and the data sinks
> be
> > > pending till a call to "env.execute()"
> > >
> > > I am somewhat leaning towards the first option right now, because I
> think
> > > that executing them later may force re-execution of larger parts of the
> > > plan.
> > >
> > > In addition: I think that the "print()" commands should go to the
> client
> > > command line. In that sense, they would behave like
> > > "collect().foreach(print)"
> > >
> > >
> > > Greetings,
> > > Stephan
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: API behavior with data sinks (lazy) and eager operations

Fabian Hueske
In reply to this post by Ufuk Celebi-2
This is a difficult question.

A program might also later refer to some intermediate data set that would
have been already computed if sinks are executed together with the count()
call and need to be computed again.
Also what do we do with sinks that are not connected with the collected or
counted data set. Executing them as well would not give any benefit but
might result in higher costs, if something on their path is used later.
On the other hand distinguishing between connected and unconnected sinks
might not be easy to reason about.

How about caching data sets where the data flow branches?

In any case, the policy should be easy to understand for users.

Cheers, Fabian

2015-01-19 11:19 GMT+01:00 Ufuk Celebi <[hidden email]>:

> I think this question depends on how much both subgraphs overlap? But in
> general, I agree that the first approach seems more desirable from the
> runtime view (multiple consumers at the branch point).
>
> On Mon, Jan 19, 2015 at 10:59 AM, Robert Metzger <[hidden email]>
> wrote:
>
> > I would also execute the sinks immediately. I think its a corner case
> > because the sinks are usually the last thing in a plan and all print() or
> > collect() statements are earlier in the plan.
> >
> > print() should go to the client command line, yes.
> >
> > On Mon, Jan 19, 2015 at 1:42 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi there!
> > >
> > > With the upcoming more interactive extensions to the API (operations
> that
> > > go back to the client from a program and need to be eagerly evaluated)
> we
> > > need to define how different actions should behave.
> > >
> > > Currently, nothing gets executed until the "env.execute()" call is
> made.
> > > That allows to produce multiple data sources at the same time, which
> is a
> > > good feature.
> > >
> > > For certain operations, like the "count()" and "collect()" functions
> > added
> > > in https://github.com/apache/flink/pull/210 , we need to trigger
> > execution
> > > immediately.
> > >
> > > The open question is, how should this behave in connection with already
> > > defined data sinks:
> > >
> > > 1) Should all yet defined data sinks be executed as well?
> > > 2) Should only that immediate operation be executed and the data sinks
> be
> > > pending till a call to "env.execute()"
> > >
> > > I am somewhat leaning towards the first option right now, because I
> think
> > > that executing them later may force re-execution of larger parts of the
> > > plan.
> > >
> > > In addition: I think that the "print()" commands should go to the
> client
> > > command line. In that sense, they would behave like
> > > "collect().foreach(print)"
> > >
> > >
> > > Greetings,
> > > Stephan
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: API behavior with data sinks (lazy) and eager operations

Max Michels
Let's make it clear that count/collection type of actions execute the
plan up till that point (including the data sinks). From a user
perspective, this seems most logic to me. The user might even rely on
the data generated by the sinks.

On Mon, Jan 19, 2015 at 11:46 AM, Fabian Hueske <[hidden email]> wrote:

> This is a difficult question.
>
> A program might also later refer to some intermediate data set that would
> have been already computed if sinks are executed together with the count()
> call and need to be computed again.
> Also what do we do with sinks that are not connected with the collected or
> counted data set. Executing them as well would not give any benefit but
> might result in higher costs, if something on their path is used later.
> On the other hand distinguishing between connected and unconnected sinks
> might not be easy to reason about.
>
> How about caching data sets where the data flow branches?
>
> In any case, the policy should be easy to understand for users.
>
> Cheers, Fabian
>
> 2015-01-19 11:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>
>> I think this question depends on how much both subgraphs overlap? But in
>> general, I agree that the first approach seems more desirable from the
>> runtime view (multiple consumers at the branch point).
>>
>> On Mon, Jan 19, 2015 at 10:59 AM, Robert Metzger <[hidden email]>
>> wrote:
>>
>> > I would also execute the sinks immediately. I think its a corner case
>> > because the sinks are usually the last thing in a plan and all print() or
>> > collect() statements are earlier in the plan.
>> >
>> > print() should go to the client command line, yes.
>> >
>> > On Mon, Jan 19, 2015 at 1:42 AM, Stephan Ewen <[hidden email]> wrote:
>> >
>> > > Hi there!
>> > >
>> > > With the upcoming more interactive extensions to the API (operations
>> that
>> > > go back to the client from a program and need to be eagerly evaluated)
>> we
>> > > need to define how different actions should behave.
>> > >
>> > > Currently, nothing gets executed until the "env.execute()" call is
>> made.
>> > > That allows to produce multiple data sources at the same time, which
>> is a
>> > > good feature.
>> > >
>> > > For certain operations, like the "count()" and "collect()" functions
>> > added
>> > > in https://github.com/apache/flink/pull/210 , we need to trigger
>> > execution
>> > > immediately.
>> > >
>> > > The open question is, how should this behave in connection with already
>> > > defined data sinks:
>> > >
>> > > 1) Should all yet defined data sinks be executed as well?
>> > > 2) Should only that immediate operation be executed and the data sinks
>> be
>> > > pending till a call to "env.execute()"
>> > >
>> > > I am somewhat leaning towards the first option right now, because I
>> think
>> > > that executing them later may force re-execution of larger parts of the
>> > > plan.
>> > >
>> > > In addition: I think that the "print()" commands should go to the
>> client
>> > > command line. In that sense, they would behave like
>> > > "collect().foreach(print)"
>> > >
>> > >
>> > > Greetings,
>> > > Stephan
>> > >
>> >
>>