[DISCUSS] Change semantics of print() to "eager"

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Change semantics of print() to "eager"

Aljoscha Krettek-2
Hi Folks,
right now .print() on DataSet creates a DataSink that prints to the
local stdout of a TaskManager. This is not very helpful when running
in a distributed environment, especially when using something like an
interactive Scala Shell in a cluster.

I propose to change print() to use collect() internally and therefore
eagerly execute without requiring env.execute().

What do you think?

Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Stephan Ewen
I think this is the 3rd discussion about this ;-)

AFAIK, the consensus in previous discussions was to do it exactly like
collect() and print to the client.

The only open question was how do we deal with the break in the API. Right
now, the programs contain a "execute()" call after the print(), which would
then throw an exception because there is nothing to be executed that was
not already part of the print().


On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi Folks,
> right now .print() on DataSet creates a DataSink that prints to the
> local stdout of a TaskManager. This is not very helpful when running
> in a distributed environment, especially when using something like an
> interactive Scala Shell in a cluster.
>
> I propose to change print() to use collect() internally and therefore
> eagerly execute without requiring env.execute().
>
> What do you think?
>
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Aljoscha Krettek-2
I think we should break the API and remove the unnecessary execute() calls.

On Tue, Apr 28, 2015 at 10:59 AM, Stephan Ewen <[hidden email]> wrote:

> I think this is the 3rd discussion about this ;-)
>
> AFAIK, the consensus in previous discussions was to do it exactly like
> collect() and print to the client.
>
> The only open question was how do we deal with the break in the API. Right
> now, the programs contain a "execute()" call after the print(), which would
> then throw an exception because there is nothing to be executed that was
> not already part of the print().
>
>
> On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi Folks,
>> right now .print() on DataSet creates a DataSink that prints to the
>> local stdout of a TaskManager. This is not very helpful when running
>> in a distributed environment, especially when using something like an
>> interactive Scala Shell in a cluster.
>>
>> I propose to change print() to use collect() internally and therefore
>> eagerly execute without requiring env.execute().
>>
>> What do you think?
>>
>> Aljoscha
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Stephan Ewen
+1 for the breaking change

Let's not to this any more than necessary, bu this is a good case...

On Tue, Apr 28, 2015 at 11:23 AM, Aljoscha Krettek <[hidden email]>
wrote:

> I think we should break the API and remove the unnecessary execute() calls.
>
> On Tue, Apr 28, 2015 at 10:59 AM, Stephan Ewen <[hidden email]> wrote:
> > I think this is the 3rd discussion about this ;-)
> >
> > AFAIK, the consensus in previous discussions was to do it exactly like
> > collect() and print to the client.
> >
> > The only open question was how do we deal with the break in the API.
> Right
> > now, the programs contain a "execute()" call after the print(), which
> would
> > then throw an exception because there is nothing to be executed that was
> > not already part of the print().
> >
> >
> > On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> >> Hi Folks,
> >> right now .print() on DataSet creates a DataSink that prints to the
> >> local stdout of a TaskManager. This is not very helpful when running
> >> in a distributed environment, especially when using something like an
> >> interactive Scala Shell in a cluster.
> >>
> >> I propose to change print() to use collect() internally and therefore
> >> eagerly execute without requiring env.execute().
> >>
> >> What do you think?
> >>
> >> Aljoscha
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Ufuk Celebi-2

On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:

> +1 for the breaking change
>
> Let's not to this any more than necessary, bu this is a good case...

+1
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Fabian Hueske-2
+1 for the breaking change

2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:

>
> On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
>
> > +1 for the breaking change
> >
> > Let's not to this any more than necessary, bu this is a good case...
>
> +1
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

mxm
I agree, print should print on the client. However, let's introduce some
big hint in the error message in case of a second execute() that this error
may arise from a previous execution.

Instead of "No sinks defined", let's print "The Flink job didn't contain
any sinks. This may be because the sinks were already executed. If you
executed the print() method on a DataSet before, the job would have already
been executed. In this case, remove the call of execute() until you have
defined further sinks".

On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> wrote:

> +1 for the breaking change
>
> 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
>
> >
> > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
> >
> > > +1 for the breaking change
> > >
> > > Let's not to this any more than necessary, bu this is a good case...
> >
> > +1
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Stephan Ewen
Sounds good, Max, let's to this in one fix.

We can maintain a counter in the ExecutionEnvironment that tracks how many
executions have happened.
In case of no prior execution, simply warn that no sinks are defined.
In case a prior execution happened, point out that nothing new is pending
execution.

On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> wrote:

> I agree, print should print on the client. However, let's introduce some
> big hint in the error message in case of a second execute() that this error
> may arise from a previous execution.
>
> Instead of "No sinks defined", let's print "The Flink job didn't contain
> any sinks. This may be because the sinks were already executed. If you
> executed the print() method on a DataSet before, the job would have already
> been executed. In this case, remove the call of execute() until you have
> defined further sinks".
>
> On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> wrote:
>
> > +1 for the breaking change
> >
> > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> >
> > >
> > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
> > >
> > > > +1 for the breaking change
> > > >
> > > > Let's not to this any more than necessary, bu this is a good case...
> > >
> > > +1
> > >
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

mxm
+1 Very nice addition.

On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote:

> Sounds good, Max, let's to this in one fix.
>
> We can maintain a counter in the ExecutionEnvironment that tracks how many
> executions have happened.
> In case of no prior execution, simply warn that no sinks are defined.
> In case a prior execution happened, point out that nothing new is pending
> execution.
>
> On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]>
> wrote:
>
> > I agree, print should print on the client. However, let's introduce some
> > big hint in the error message in case of a second execute() that this
> error
> > may arise from a previous execution.
> >
> > Instead of "No sinks defined", let's print "The Flink job didn't contain
> > any sinks. This may be because the sinks were already executed. If you
> > executed the print() method on a DataSet before, the job would have
> already
> > been executed. In this case, remove the call of execute() until you have
> > defined further sinks".
> >
> > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > +1 for the breaking change
> > >
> > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> > >
> > > >
> > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
> > > >
> > > > > +1 for the breaking change
> > > > >
> > > > > Let's not to this any more than necessary, bu this is a good
> case...
> > > >
> > > > +1
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Till Rohrmann
+1

On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]> wrote:

> +1 Very nice addition.
>
> On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Sounds good, Max, let's to this in one fix.
> >
> > We can maintain a counter in the ExecutionEnvironment that tracks how
> many
> > executions have happened.
> > In case of no prior execution, simply warn that no sinks are defined.
> > In case a prior execution happened, point out that nothing new is pending
> > execution.
> >
> > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]>
> > wrote:
> >
> > > I agree, print should print on the client. However, let's introduce
> some
> > > big hint in the error message in case of a second execute() that this
> > error
> > > may arise from a previous execution.
> > >
> > > Instead of "No sinks defined", let's print "The Flink job didn't
> contain
> > > any sinks. This may be because the sinks were already executed. If you
> > > executed the print() method on a DataSet before, the job would have
> > already
> > > been executed. In this case, remove the call of execute() until you
> have
> > > defined further sinks".
> > >
> > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > +1 for the breaking change
> > > >
> > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> > > >
> > > > >
> > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
> > > > >
> > > > > > +1 for the breaking change
> > > > > >
> > > > > > Let's not to this any more than necessary, bu this is a good
> > case...
> > > > >
> > > > > +1
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Robert Metzger
Why aren't we adding a new method "printLocal()" which is doing the local
printing?
That would not break any existing code.

How much work would it be to implement this "properly" without piggybacking
the accumulators?
I assume we would need to write the data to one or more partitions and
request the partitions from the client JVM.
I also assume that we have the code in place now to do it like this.

On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]> wrote:

> +1
>
> On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]>
> wrote:
>
> > +1 Very nice addition.
> >
> > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Sounds good, Max, let's to this in one fix.
> > >
> > > We can maintain a counter in the ExecutionEnvironment that tracks how
> > many
> > > executions have happened.
> > > In case of no prior execution, simply warn that no sinks are defined.
> > > In case a prior execution happened, point out that nothing new is
> pending
> > > execution.
> > >
> > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]>
> > > wrote:
> > >
> > > > I agree, print should print on the client. However, let's introduce
> > some
> > > > big hint in the error message in case of a second execute() that this
> > > error
> > > > may arise from a previous execution.
> > > >
> > > > Instead of "No sinks defined", let's print "The Flink job didn't
> > contain
> > > > any sinks. This may be because the sinks were already executed. If
> you
> > > > executed the print() method on a DataSet before, the job would have
> > > already
> > > > been executed. In this case, remove the call of execute() until you
> > have
> > > > defined further sinks".
> > > >
> > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]>
> > > wrote:
> > > >
> > > > > +1 for the breaking change
> > > > >
> > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> > > > >
> > > > > >
> > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote:
> > > > > >
> > > > > > > +1 for the breaking change
> > > > > > >
> > > > > > > Let's not to this any more than necessary, bu this is a good
> > > case...
> > > > > >
> > > > > > +1
> > > > > >
> > > > >
> > > >
> > >
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

mxm
>
> Why aren't we adding a new method "printLocal()" which is doing the local
> printing?


Simply because it is counter-intuitive that a regular print does not print
on the Client. Introducing a new method would not solve this confusion. I
think it's fine to break the API as long as we provide a meaningful error
message.

It should be ok to use the Accumulator way. We have to force an execution
either way to compute the result to be printed. Printing shouldn't be used
for large output anyways.

On Tue, Apr 28, 2015 at 3:49 PM, Robert Metzger <[hidden email]> wrote:

> Why aren't we adding a new method "printLocal()" which is doing the local
> printing?
> That would not break any existing code.
>
> How much work would it be to implement this "properly" without piggybacking
> the accumulators?
> I assume we would need to write the data to one or more partitions and
> request the partitions from the client JVM.
> I also assume that we have the code in place now to do it like this.
>
> On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]>
> wrote:
>
> > +1
> >
> > On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]>
> > wrote:
> >
> > > +1 Very nice addition.
> > >
> > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]>
> wrote:
> > >
> > > > Sounds good, Max, let's to this in one fix.
> > > >
> > > > We can maintain a counter in the ExecutionEnvironment that tracks how
> > > many
> > > > executions have happened.
> > > > In case of no prior execution, simply warn that no sinks are defined.
> > > > In case a prior execution happened, point out that nothing new is
> > pending
> > > > execution.
> > > >
> > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]>
> > > > wrote:
> > > >
> > > > > I agree, print should print on the client. However, let's introduce
> > > some
> > > > > big hint in the error message in case of a second execute() that
> this
> > > > error
> > > > > may arise from a previous execution.
> > > > >
> > > > > Instead of "No sinks defined", let's print "The Flink job didn't
> > > contain
> > > > > any sinks. This may be because the sinks were already executed. If
> > you
> > > > > executed the print() method on a DataSet before, the job would have
> > > > already
> > > > > been executed. In this case, remove the call of execute() until you
> > > have
> > > > > defined further sinks".
> > > > >
> > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]>
> > > > wrote:
> > > > >
> > > > > > +1 for the breaking change
> > > > > >
> > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> > > > > >
> > > > > > >
> > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]>
> wrote:
> > > > > > >
> > > > > > > > +1 for the breaking change
> > > > > > > >
> > > > > > > > Let's not to this any more than necessary, bu this is a good
> > > > case...
> > > > > > >
> > > > > > > +1
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Change semantics of print() to "eager"

Stephan Ewen
Implementing this "properly" is the same thing as supporting larger
collect() payloads and larger accumulators.

For that, data needs to go through the BLOB manager, rather than be part of
the actor messages.

Stephan

On Tue, Apr 28, 2015 at 5:27 PM, Maximilian Michels <[hidden email]> wrote:

> >
> > Why aren't we adding a new method "printLocal()" which is doing the local
> > printing?
>
>
> Simply because it is counter-intuitive that a regular print does not print
> on the Client. Introducing a new method would not solve this confusion. I
> think it's fine to break the API as long as we provide a meaningful error
> message.
>
> It should be ok to use the Accumulator way. We have to force an execution
> either way to compute the result to be printed. Printing shouldn't be used
> for large output anyways.
>
> On Tue, Apr 28, 2015 at 3:49 PM, Robert Metzger <[hidden email]>
> wrote:
>
> > Why aren't we adding a new method "printLocal()" which is doing the local
> > printing?
> > That would not break any existing code.
> >
> > How much work would it be to implement this "properly" without
> piggybacking
> > the accumulators?
> > I assume we would need to write the data to one or more partitions and
> > request the partitions from the client JVM.
> > I also assume that we have the code in place now to do it like this.
> >
> > On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]>
> > wrote:
> >
> > > +1
> > >
> > > On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]>
> > > wrote:
> > >
> > > > +1 Very nice addition.
> > > >
> > > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > > >
> > > > > Sounds good, Max, let's to this in one fix.
> > > > >
> > > > > We can maintain a counter in the ExecutionEnvironment that tracks
> how
> > > > many
> > > > > executions have happened.
> > > > > In case of no prior execution, simply warn that no sinks are
> defined.
> > > > > In case a prior execution happened, point out that nothing new is
> > > pending
> > > > > execution.
> > > > >
> > > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <
> [hidden email]>
> > > > > wrote:
> > > > >
> > > > > > I agree, print should print on the client. However, let's
> introduce
> > > > some
> > > > > > big hint in the error message in case of a second execute() that
> > this
> > > > > error
> > > > > > may arise from a previous execution.
> > > > > >
> > > > > > Instead of "No sinks defined", let's print "The Flink job didn't
> > > > contain
> > > > > > any sinks. This may be because the sinks were already executed.
> If
> > > you
> > > > > > executed the print() method on a DataSet before, the job would
> have
> > > > > already
> > > > > > been executed. In this case, remove the call of execute() until
> you
> > > > have
> > > > > > defined further sinks".
> > > > > >
> > > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > > +1 for the breaking change
> > > > > > >
> > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>:
> > > > > > >
> > > > > > > >
> > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]>
> > wrote:
> > > > > > > >
> > > > > > > > > +1 for the breaking change
> > > > > > > > >
> > > > > > > > > Let's not to this any more than necessary, bu this is a
> good
> > > > > case...
> > > > > > > >
> > > > > > > > +1
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>