Side-effects of DataSet::count

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Side-effects of DataSet::count

Eron Wright
I was curious as to how the `count` method on DataSet worked, and was surprised to see that it executes the entire program graph.   Wouldn’t this cause undesirable side-effects like writing to sinks?    Also strange that the graph is mutated with the addition of a sink (that isn’t subsequently removed).

Surveying the Flink code, there aren’t many situations where the program graph is implicitly executed (`collect` is another).   Nonetheless, this has deepened my appreciation for how dynamic the application might be.

// DataSet.java
public long count() throws Exception {
   final String id = new AbstractID().toString();

   output(new Utils.CountHelper<T>(id)).name("count()");

   JobExecutionResult res = getExecutionEnvironment().execute();
   return res.<Long> getAccumulatorResult(id);
}
Eron
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Márton Balassi
Hey Eron,

Yes, DataSet#collect and count methods implicitly trigger a JobGraph
execution, thus they also trigger writing to any previously defined sinks.
The idea behind this behavior is to enable interactive querying (the one
that you are used to get from a shell environment) and it is also a great
debugging tool.

Best,

Marton

On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]> wrote:

> I was curious as to how the `count` method on DataSet worked, and was
> surprised to see that it executes the entire program graph.   Wouldn’t this
> cause undesirable side-effects like writing to sinks?    Also strange that
> the graph is mutated with the addition of a sink (that isn’t subsequently
> removed).
>
> Surveying the Flink code, there aren’t many situations where the program
> graph is implicitly executed (`collect` is another).   Nonetheless, this
> has deepened my appreciation for how dynamic the application might be.
>
> // DataSet.java
> public long count() throws Exception {
>    final String id = new AbstractID().toString();
>
>    output(new Utils.CountHelper<T>(id)).name("count()");
>
>    JobExecutionResult res = getExecutionEnvironment().execute();
>    return res.<Long> getAccumulatorResult(id);
> }
> Eron
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Eron Wright
Thinking out loud now…

Is the job graph fully mutable?   Can it be cleared?   For example, shouldn’t the count method remove the sink after execution completes?

Can numerous job graphs co-exist within a single driver program?    How would that relate to the session concept?

Seems the count method should use ‘backtracking’ schedule mode, and only execute the minimum needed to materialize the count sink.

> On May 29, 2016, at 3:08 PM, Márton Balassi <[hidden email]> wrote:
>
> Hey Eron,
>
> Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> execution, thus they also trigger writing to any previously defined sinks.
> The idea behind this behavior is to enable interactive querying (the one
> that you are used to get from a shell environment) and it is also a great
> debugging tool.
>
> Best,
>
> Marton
>
> On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]> wrote:
>
>> I was curious as to how the `count` method on DataSet worked, and was
>> surprised to see that it executes the entire program graph.   Wouldn’t this
>> cause undesirable side-effects like writing to sinks?    Also strange that
>> the graph is mutated with the addition of a sink (that isn’t subsequently
>> removed).
>>
>> Surveying the Flink code, there aren’t many situations where the program
>> graph is implicitly executed (`collect` is another).   Nonetheless, this
>> has deepened my appreciation for how dynamic the application might be.
>>
>> // DataSet.java
>> public long count() throws Exception {
>>   final String id = new AbstractID().toString();
>>
>>   output(new Utils.CountHelper<T>(id)).name("count()");
>>
>>   JobExecutionResult res = getExecutionEnvironment().execute();
>>   return res.<Long> getAccumulatorResult(id);
>> }
>> Eron

Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Stephan Ewen
Hi Eron!

Yes, the idea is to actually switch all executions to a backtracking
scheduling mode. That simultaneously solves both fine grained recovery and
lazy execution, where later stages build on prior stages.

With all the work around streaming, we have not gotten to this so far, but
it is one feature still in the list...

Greetings,
Stephan


On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:

> Thinking out loud now…
>
> Is the job graph fully mutable?   Can it be cleared?   For example,
> shouldn’t the count method remove the sink after execution completes?
>
> Can numerous job graphs co-exist within a single driver program?    How
> would that relate to the session concept?
>
> Seems the count method should use ‘backtracking’ schedule mode, and only
> execute the minimum needed to materialize the count sink.
>
> > On May 29, 2016, at 3:08 PM, Márton Balassi <[hidden email]>
> wrote:
> >
> > Hey Eron,
> >
> > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > execution, thus they also trigger writing to any previously defined
> sinks.
> > The idea behind this behavior is to enable interactive querying (the one
> > that you are used to get from a shell environment) and it is also a great
> > debugging tool.
> >
> > Best,
> >
> > Marton
> >
> > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]> wrote:
> >
> >> I was curious as to how the `count` method on DataSet worked, and was
> >> surprised to see that it executes the entire program graph.   Wouldn’t
> this
> >> cause undesirable side-effects like writing to sinks?    Also strange
> that
> >> the graph is mutated with the addition of a sink (that isn’t
> subsequently
> >> removed).
> >>
> >> Surveying the Flink code, there aren’t many situations where the program
> >> graph is implicitly executed (`collect` is another).   Nonetheless, this
> >> has deepened my appreciation for how dynamic the application might be.
> >>
> >> // DataSet.java
> >> public long count() throws Exception {
> >>   final String id = new AbstractID().toString();
> >>
> >>   output(new Utils.CountHelper<T>(id)).name("count()");
> >>
> >>   JobExecutionResult res = getExecutionEnvironment().execute();
> >>   return res.<Long> getAccumulatorResult(id);
> >> }
> >> Eron
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Simone Robutti
On this same subject, I have a question. Is it possible to achieve a lazy
count that transforms a DataSet[T] to a DataSet[Long] with a single value
containing the length of the original DataSet? Otherwise what is the best
way to count the elements lazily?

2016-05-30 23:49 GMT+02:00 Stephan Ewen <[hidden email]>:

> Hi Eron!
>
> Yes, the idea is to actually switch all executions to a backtracking
> scheduling mode. That simultaneously solves both fine grained recovery and
> lazy execution, where later stages build on prior stages.
>
> With all the work around streaming, we have not gotten to this so far, but
> it is one feature still in the list...
>
> Greetings,
> Stephan
>
>
> On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
>
> > Thinking out loud now…
> >
> > Is the job graph fully mutable?   Can it be cleared?   For example,
> > shouldn’t the count method remove the sink after execution completes?
> >
> > Can numerous job graphs co-exist within a single driver program?    How
> > would that relate to the session concept?
> >
> > Seems the count method should use ‘backtracking’ schedule mode, and only
> > execute the minimum needed to materialize the count sink.
> >
> > > On May 29, 2016, at 3:08 PM, Márton Balassi <[hidden email]>
> > wrote:
> > >
> > > Hey Eron,
> > >
> > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > execution, thus they also trigger writing to any previously defined
> > sinks.
> > > The idea behind this behavior is to enable interactive querying (the
> one
> > > that you are used to get from a shell environment) and it is also a
> great
> > > debugging tool.
> > >
> > > Best,
> > >
> > > Marton
> > >
> > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
> wrote:
> > >
> > >> I was curious as to how the `count` method on DataSet worked, and was
> > >> surprised to see that it executes the entire program graph.   Wouldn’t
> > this
> > >> cause undesirable side-effects like writing to sinks?    Also strange
> > that
> > >> the graph is mutated with the addition of a sink (that isn’t
> > subsequently
> > >> removed).
> > >>
> > >> Surveying the Flink code, there aren’t many situations where the
> program
> > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> this
> > >> has deepened my appreciation for how dynamic the application might be.
> > >>
> > >> // DataSet.java
> > >> public long count() throws Exception {
> > >>   final String id = new AbstractID().toString();
> > >>
> > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > >>
> > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > >>   return res.<Long> getAccumulatorResult(id);
> > >> }
> > >> Eron
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Greg Hogan
Hi Simone,

This can be done with a map followed by a reduce. DataSet#count leverages
accumulators which perform an inherent reduce. Also, DataSet#count
implements RichOutputFormat as an optimization to only require a single
operator. Previously the counting and accumulating was handled in a
RichMapFunction followed by a DiscardingOutputFormat.

This has come up in Gelly as graphs have global metrics such as the Global
Clustering Coefficient for which we need to count a DataSet of triangles as
well as the number of triplets. It would be great if there was a way to
optimally both return a DataSet and compute an accumulated value.

Greg

On Mon, May 30, 2016 at 6:27 PM, Simone Robutti <
[hidden email]> wrote:

> On this same subject, I have a question. Is it possible to achieve a lazy
> count that transforms a DataSet[T] to a DataSet[Long] with a single value
> containing the length of the original DataSet? Otherwise what is the best
> way to count the elements lazily?
>
> 2016-05-30 23:49 GMT+02:00 Stephan Ewen <[hidden email]>:
>
> > Hi Eron!
> >
> > Yes, the idea is to actually switch all executions to a backtracking
> > scheduling mode. That simultaneously solves both fine grained recovery
> and
> > lazy execution, where later stages build on prior stages.
> >
> > With all the work around streaming, we have not gotten to this so far,
> but
> > it is one feature still in the list...
> >
> > Greetings,
> > Stephan
> >
> >
> > On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
> >
> > > Thinking out loud now…
> > >
> > > Is the job graph fully mutable?   Can it be cleared?   For example,
> > > shouldn’t the count method remove the sink after execution completes?
> > >
> > > Can numerous job graphs co-exist within a single driver program?    How
> > > would that relate to the session concept?
> > >
> > > Seems the count method should use ‘backtracking’ schedule mode, and
> only
> > > execute the minimum needed to materialize the count sink.
> > >
> > > > On May 29, 2016, at 3:08 PM, Márton Balassi <
> [hidden email]>
> > > wrote:
> > > >
> > > > Hey Eron,
> > > >
> > > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > > execution, thus they also trigger writing to any previously defined
> > > sinks.
> > > > The idea behind this behavior is to enable interactive querying (the
> > one
> > > > that you are used to get from a shell environment) and it is also a
> > great
> > > > debugging tool.
> > > >
> > > > Best,
> > > >
> > > > Marton
> > > >
> > > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
> > wrote:
> > > >
> > > >> I was curious as to how the `count` method on DataSet worked, and
> was
> > > >> surprised to see that it executes the entire program graph.
>  Wouldn’t
> > > this
> > > >> cause undesirable side-effects like writing to sinks?    Also
> strange
> > > that
> > > >> the graph is mutated with the addition of a sink (that isn’t
> > > subsequently
> > > >> removed).
> > > >>
> > > >> Surveying the Flink code, there aren’t many situations where the
> > program
> > > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> > this
> > > >> has deepened my appreciation for how dynamic the application might
> be.
> > > >>
> > > >> // DataSet.java
> > > >> public long count() throws Exception {
> > > >>   final String id = new AbstractID().toString();
> > > >>
> > > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > > >>
> > > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > > >>   return res.<Long> getAccumulatorResult(id);
> > > >> }
> > > >> Eron
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Greg Hogan
In reply to this post by Stephan Ewen
Hi Stephan,

Is there a design document, prior discussion, or background material on
this enhancement? Am I correct in understanding that this only applies to
DataSet since streams run indefinitely?

Thanks,
Greg

On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen <[hidden email]> wrote:

> Hi Eron!
>
> Yes, the idea is to actually switch all executions to a backtracking
> scheduling mode. That simultaneously solves both fine grained recovery and
> lazy execution, where later stages build on prior stages.
>
> With all the work around streaming, we have not gotten to this so far, but
> it is one feature still in the list...
>
> Greetings,
> Stephan
>
>
> On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
>
> > Thinking out loud now…
> >
> > Is the job graph fully mutable?   Can it be cleared?   For example,
> > shouldn’t the count method remove the sink after execution completes?
> >
> > Can numerous job graphs co-exist within a single driver program?    How
> > would that relate to the session concept?
> >
> > Seems the count method should use ‘backtracking’ schedule mode, and only
> > execute the minimum needed to materialize the count sink.
> >
> > > On May 29, 2016, at 3:08 PM, Márton Balassi <[hidden email]>
> > wrote:
> > >
> > > Hey Eron,
> > >
> > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > execution, thus they also trigger writing to any previously defined
> > sinks.
> > > The idea behind this behavior is to enable interactive querying (the
> one
> > > that you are used to get from a shell environment) and it is also a
> great
> > > debugging tool.
> > >
> > > Best,
> > >
> > > Marton
> > >
> > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
> wrote:
> > >
> > >> I was curious as to how the `count` method on DataSet worked, and was
> > >> surprised to see that it executes the entire program graph.   Wouldn’t
> > this
> > >> cause undesirable side-effects like writing to sinks?    Also strange
> > that
> > >> the graph is mutated with the addition of a sink (that isn’t
> > subsequently
> > >> removed).
> > >>
> > >> Surveying the Flink code, there aren’t many situations where the
> program
> > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> this
> > >> has deepened my appreciation for how dynamic the application might be.
> > >>
> > >> // DataSet.java
> > >> public long count() throws Exception {
> > >>   final String id = new AbstractID().toString();
> > >>
> > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > >>
> > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > >>   return res.<Long> getAccumulatorResult(id);
> > >> }
> > >> Eron
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Stephan Ewen
Hi!

There was some preliminary work on this. By now, the requirements have
grown a bit. The backtracking needs to handle

  - Scheduling for execution (the here raised point), possibly resuming
from available intermediate results
  - Recovery from partially executed programs, where operators execute
whole or not (batch style)
  - Recover from intermediate result since latest completed checkpoint
  - Eventually even recover superstep-based iterations.

So the design needs to be extended slightly. We do not have a design
writeup for this, but I agree, it would be great to have one.
I have a pretty good general idea about this, let me see if I can get to
that next week.

In general, for such things (long standing ideas and designs), we should
have something like Kafka has with its KIPs (Kafka Improvement Proposal) -
a place where to collect them, refine them over time, and
see how people react to them or step up to implement them. We could call
them 3Fs (Flink Feature Forms) ;-)

Greetings,
Stephan


On Tue, May 31, 2016 at 1:02 AM, Greg Hogan <[hidden email]> wrote:

> Hi Stephan,
>
> Is there a design document, prior discussion, or background material on
> this enhancement? Am I correct in understanding that this only applies to
> DataSet since streams run indefinitely?
>
> Thanks,
> Greg
>
> On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi Eron!
> >
> > Yes, the idea is to actually switch all executions to a backtracking
> > scheduling mode. That simultaneously solves both fine grained recovery
> and
> > lazy execution, where later stages build on prior stages.
> >
> > With all the work around streaming, we have not gotten to this so far,
> but
> > it is one feature still in the list...
> >
> > Greetings,
> > Stephan
> >
> >
> > On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
> >
> > > Thinking out loud now…
> > >
> > > Is the job graph fully mutable?   Can it be cleared?   For example,
> > > shouldn’t the count method remove the sink after execution completes?
> > >
> > > Can numerous job graphs co-exist within a single driver program?    How
> > > would that relate to the session concept?
> > >
> > > Seems the count method should use ‘backtracking’ schedule mode, and
> only
> > > execute the minimum needed to materialize the count sink.
> > >
> > > > On May 29, 2016, at 3:08 PM, Márton Balassi <
> [hidden email]>
> > > wrote:
> > > >
> > > > Hey Eron,
> > > >
> > > > Yes, DataSet#collect and count methods implicitly trigger a JobGraph
> > > > execution, thus they also trigger writing to any previously defined
> > > sinks.
> > > > The idea behind this behavior is to enable interactive querying (the
> > one
> > > > that you are used to get from a shell environment) and it is also a
> > great
> > > > debugging tool.
> > > >
> > > > Best,
> > > >
> > > > Marton
> > > >
> > > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
> > wrote:
> > > >
> > > >> I was curious as to how the `count` method on DataSet worked, and
> was
> > > >> surprised to see that it executes the entire program graph.
>  Wouldn’t
> > > this
> > > >> cause undesirable side-effects like writing to sinks?    Also
> strange
> > > that
> > > >> the graph is mutated with the addition of a sink (that isn’t
> > > subsequently
> > > >> removed).
> > > >>
> > > >> Surveying the Flink code, there aren’t many situations where the
> > program
> > > >> graph is implicitly executed (`collect` is another).   Nonetheless,
> > this
> > > >> has deepened my appreciation for how dynamic the application might
> be.
> > > >>
> > > >> // DataSet.java
> > > >> public long count() throws Exception {
> > > >>   final String id = new AbstractID().toString();
> > > >>
> > > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > > >>
> > > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > > >>   return res.<Long> getAccumulatorResult(id);
> > > >> }
> > > >> Eron
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Aljoscha Krettek-2
That last section is a really good Idea! I have several design docs
floating around that were announced on the ML. Without a central place to
store them they are hard to find, though.

-Aljoscha


On Tue, 31 May 2016 at 11:27 Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> There was some preliminary work on this. By now, the requirements have
> grown a bit. The backtracking needs to handle
>
>   - Scheduling for execution (the here raised point), possibly resuming
> from available intermediate results
>   - Recovery from partially executed programs, where operators execute
> whole or not (batch style)
>   - Recover from intermediate result since latest completed checkpoint
>   - Eventually even recover superstep-based iterations.
>
> So the design needs to be extended slightly. We do not have a design
> writeup for this, but I agree, it would be great to have one.
> I have a pretty good general idea about this, let me see if I can get to
> that next week.
>
> In general, for such things (long standing ideas and designs), we should
> have something like Kafka has with its KIPs (Kafka Improvement Proposal) -
> a place where to collect them, refine them over time, and
> see how people react to them or step up to implement them. We could call
> them 3Fs (Flink Feature Forms) ;-)
>
> Greetings,
> Stephan
>
>
> On Tue, May 31, 2016 at 1:02 AM, Greg Hogan <[hidden email]> wrote:
>
> > Hi Stephan,
> >
> > Is there a design document, prior discussion, or background material on
> > this enhancement? Am I correct in understanding that this only applies to
> > DataSet since streams run indefinitely?
> >
> > Thanks,
> > Greg
> >
> > On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi Eron!
> > >
> > > Yes, the idea is to actually switch all executions to a backtracking
> > > scheduling mode. That simultaneously solves both fine grained recovery
> > and
> > > lazy execution, where later stages build on prior stages.
> > >
> > > With all the work around streaming, we have not gotten to this so far,
> > but
> > > it is one feature still in the list...
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
> > >
> > > > Thinking out loud now…
> > > >
> > > > Is the job graph fully mutable?   Can it be cleared?   For example,
> > > > shouldn’t the count method remove the sink after execution completes?
> > > >
> > > > Can numerous job graphs co-exist within a single driver program?
> How
> > > > would that relate to the session concept?
> > > >
> > > > Seems the count method should use ‘backtracking’ schedule mode, and
> > only
> > > > execute the minimum needed to materialize the count sink.
> > > >
> > > > > On May 29, 2016, at 3:08 PM, Márton Balassi <
> > [hidden email]>
> > > > wrote:
> > > > >
> > > > > Hey Eron,
> > > > >
> > > > > Yes, DataSet#collect and count methods implicitly trigger a
> JobGraph
> > > > > execution, thus they also trigger writing to any previously defined
> > > > sinks.
> > > > > The idea behind this behavior is to enable interactive querying
> (the
> > > one
> > > > > that you are used to get from a shell environment) and it is also a
> > > great
> > > > > debugging tool.
> > > > >
> > > > > Best,
> > > > >
> > > > > Marton
> > > > >
> > > > > On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
> > > wrote:
> > > > >
> > > > >> I was curious as to how the `count` method on DataSet worked, and
> > was
> > > > >> surprised to see that it executes the entire program graph.
> >  Wouldn’t
> > > > this
> > > > >> cause undesirable side-effects like writing to sinks?    Also
> > strange
> > > > that
> > > > >> the graph is mutated with the addition of a sink (that isn’t
> > > > subsequently
> > > > >> removed).
> > > > >>
> > > > >> Surveying the Flink code, there aren’t many situations where the
> > > program
> > > > >> graph is implicitly executed (`collect` is another).
>  Nonetheless,
> > > this
> > > > >> has deepened my appreciation for how dynamic the application might
> > be.
> > > > >>
> > > > >> // DataSet.java
> > > > >> public long count() throws Exception {
> > > > >>   final String id = new AbstractID().toString();
> > > > >>
> > > > >>   output(new Utils.CountHelper<T>(id)).name("count()");
> > > > >>
> > > > >>   JobExecutionResult res = getExecutionEnvironment().execute();
> > > > >>   return res.<Long> getAccumulatorResult(id);
> > > > >> }
> > > > >> Eron
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side-effects of DataSet::count

Ovidiu-Cristian MARCU
In reply to this post by Stephan Ewen
Hi Stephan and all,

Some reference to this may be https://issues.apache.org/jira/browse/FLINK-2250 <https://issues.apache.org/jira/browse/FLINK-2250> ?
I agree your priorities on streaming are very high, it will make a big +1 for the community to create a discussion/place for the design proposal improvement and eventually launch an initial draft (including new requirements). As one can try to dig in, is quite complex what you have already achieved (for example FLINK-2097, FLINK-1350, FLINK-1359 and related, mainly FLINK-986). These issues are a pain for DataSets.

Best,
Ovidiu

> On 31 May 2016, at 11:27, Stephan Ewen <[hidden email]> wrote:
>
> Hi!
>
> There was some preliminary work on this. By now, the requirements have
> grown a bit. The backtracking needs to handle
>
>  - Scheduling for execution (the here raised point), possibly resuming
> from available intermediate results
>  - Recovery from partially executed programs, where operators execute
> whole or not (batch style)
>  - Recover from intermediate result since latest completed checkpoint
>  - Eventually even recover superstep-based iterations.
>
> So the design needs to be extended slightly. We do not have a design
> writeup for this, but I agree, it would be great to have one.
> I have a pretty good general idea about this, let me see if I can get to
> that next week.
>
> In general, for such things (long standing ideas and designs), we should
> have something like Kafka has with its KIPs (Kafka Improvement Proposal) -
> a place where to collect them, refine them over time, and
> see how people react to them or step up to implement them. We could call
> them 3Fs (Flink Feature Forms) ;-)
>
> Greetings,
> Stephan
>
>
> On Tue, May 31, 2016 at 1:02 AM, Greg Hogan <[hidden email]> wrote:
>
>> Hi Stephan,
>>
>> Is there a design document, prior discussion, or background material on
>> this enhancement? Am I correct in understanding that this only applies to
>> DataSet since streams run indefinitely?
>>
>> Thanks,
>> Greg
>>
>> On Mon, May 30, 2016 at 5:49 PM, Stephan Ewen <[hidden email]> wrote:
>>
>>> Hi Eron!
>>>
>>> Yes, the idea is to actually switch all executions to a backtracking
>>> scheduling mode. That simultaneously solves both fine grained recovery
>> and
>>> lazy execution, where later stages build on prior stages.
>>>
>>> With all the work around streaming, we have not gotten to this so far,
>> but
>>> it is one feature still in the list...
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, May 30, 2016 at 9:55 PM, Eron Wright <[hidden email]> wrote:
>>>
>>>> Thinking out loud now…
>>>>
>>>> Is the job graph fully mutable?   Can it be cleared?   For example,
>>>> shouldn’t the count method remove the sink after execution completes?
>>>>
>>>> Can numerous job graphs co-exist within a single driver program?    How
>>>> would that relate to the session concept?
>>>>
>>>> Seems the count method should use ‘backtracking’ schedule mode, and
>> only
>>>> execute the minimum needed to materialize the count sink.
>>>>
>>>>> On May 29, 2016, at 3:08 PM, Márton Balassi <
>> [hidden email]>
>>>> wrote:
>>>>>
>>>>> Hey Eron,
>>>>>
>>>>> Yes, DataSet#collect and count methods implicitly trigger a JobGraph
>>>>> execution, thus they also trigger writing to any previously defined
>>>> sinks.
>>>>> The idea behind this behavior is to enable interactive querying (the
>>> one
>>>>> that you are used to get from a shell environment) and it is also a
>>> great
>>>>> debugging tool.
>>>>>
>>>>> Best,
>>>>>
>>>>> Marton
>>>>>
>>>>> On Sun, May 29, 2016 at 11:28 PM, Eron Wright <[hidden email]>
>>> wrote:
>>>>>
>>>>>> I was curious as to how the `count` method on DataSet worked, and
>> was
>>>>>> surprised to see that it executes the entire program graph.
>> Wouldn’t
>>>> this
>>>>>> cause undesirable side-effects like writing to sinks?    Also
>> strange
>>>> that
>>>>>> the graph is mutated with the addition of a sink (that isn’t
>>>> subsequently
>>>>>> removed).
>>>>>>
>>>>>> Surveying the Flink code, there aren’t many situations where the
>>> program
>>>>>> graph is implicitly executed (`collect` is another).   Nonetheless,
>>> this
>>>>>> has deepened my appreciation for how dynamic the application might
>> be.
>>>>>>
>>>>>> // DataSet.java
>>>>>> public long count() throws Exception {
>>>>>>  final String id = new AbstractID().toString();
>>>>>>
>>>>>>  output(new Utils.CountHelper<T>(id)).name("count()");
>>>>>>
>>>>>>  JobExecutionResult res = getExecutionEnvironment().execute();
>>>>>>  return res.<Long> getAccumulatorResult(id);
>>>>>> }
>>>>>> Eron
>>>>
>>>>
>>>
>>