Hi Folks,
right now .print() on DataSet creates a DataSink that prints to the local stdout of a TaskManager. This is not very helpful when running in a distributed environment, especially when using something like an interactive Scala Shell in a cluster. I propose to change print() to use collect() internally and therefore eagerly execute without requiring env.execute(). What do you think? Aljoscha |
I think this is the 3rd discussion about this ;-)
AFAIK, the consensus in previous discussions was to do it exactly like collect() and print to the client. The only open question was how do we deal with the break in the API. Right now, the programs contain a "execute()" call after the print(), which would then throw an exception because there is nothing to be executed that was not already part of the print(). On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi Folks, > right now .print() on DataSet creates a DataSink that prints to the > local stdout of a TaskManager. This is not very helpful when running > in a distributed environment, especially when using something like an > interactive Scala Shell in a cluster. > > I propose to change print() to use collect() internally and therefore > eagerly execute without requiring env.execute(). > > What do you think? > > Aljoscha > |
I think we should break the API and remove the unnecessary execute() calls.
On Tue, Apr 28, 2015 at 10:59 AM, Stephan Ewen <[hidden email]> wrote: > I think this is the 3rd discussion about this ;-) > > AFAIK, the consensus in previous discussions was to do it exactly like > collect() and print to the client. > > The only open question was how do we deal with the break in the API. Right > now, the programs contain a "execute()" call after the print(), which would > then throw an exception because there is nothing to be executed that was > not already part of the print(). > > > On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]> > wrote: > >> Hi Folks, >> right now .print() on DataSet creates a DataSink that prints to the >> local stdout of a TaskManager. This is not very helpful when running >> in a distributed environment, especially when using something like an >> interactive Scala Shell in a cluster. >> >> I propose to change print() to use collect() internally and therefore >> eagerly execute without requiring env.execute(). >> >> What do you think? >> >> Aljoscha >> |
+1 for the breaking change
Let's not to this any more than necessary, bu this is a good case... On Tue, Apr 28, 2015 at 11:23 AM, Aljoscha Krettek <[hidden email]> wrote: > I think we should break the API and remove the unnecessary execute() calls. > > On Tue, Apr 28, 2015 at 10:59 AM, Stephan Ewen <[hidden email]> wrote: > > I think this is the 3rd discussion about this ;-) > > > > AFAIK, the consensus in previous discussions was to do it exactly like > > collect() and print to the client. > > > > The only open question was how do we deal with the break in the API. > Right > > now, the programs contain a "execute()" call after the print(), which > would > > then throw an exception because there is nothing to be executed that was > > not already part of the print(). > > > > > > On Tue, Apr 28, 2015 at 10:18 AM, Aljoscha Krettek <[hidden email]> > > wrote: > > > >> Hi Folks, > >> right now .print() on DataSet creates a DataSink that prints to the > >> local stdout of a TaskManager. This is not very helpful when running > >> in a distributed environment, especially when using something like an > >> interactive Scala Shell in a cluster. > >> > >> I propose to change print() to use collect() internally and therefore > >> eagerly execute without requiring env.execute(). > >> > >> What do you think? > >> > >> Aljoscha > >> > |
On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > +1 for the breaking change > > Let's not to this any more than necessary, bu this is a good case... +1 |
+1 for the breaking change
2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > +1 for the breaking change > > > > Let's not to this any more than necessary, bu this is a good case... > > +1 > |
I agree, print should print on the client. However, let's introduce some
big hint in the error message in case of a second execute() that this error may arise from a previous execution. Instead of "No sinks defined", let's print "The Flink job didn't contain any sinks. This may be because the sinks were already executed. If you executed the print() method on a DataSet before, the job would have already been executed. In this case, remove the call of execute() until you have defined further sinks". On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> wrote: > +1 for the breaking change > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > > > +1 for the breaking change > > > > > > Let's not to this any more than necessary, bu this is a good case... > > > > +1 > > > |
Sounds good, Max, let's to this in one fix.
We can maintain a counter in the ExecutionEnvironment that tracks how many executions have happened. In case of no prior execution, simply warn that no sinks are defined. In case a prior execution happened, point out that nothing new is pending execution. On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> wrote: > I agree, print should print on the client. However, let's introduce some > big hint in the error message in case of a second execute() that this error > may arise from a previous execution. > > Instead of "No sinks defined", let's print "The Flink job didn't contain > any sinks. This may be because the sinks were already executed. If you > executed the print() method on a DataSet before, the job would have already > been executed. In this case, remove the call of execute() until you have > defined further sinks". > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> wrote: > > > +1 for the breaking change > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > > > > > +1 for the breaking change > > > > > > > > Let's not to this any more than necessary, bu this is a good case... > > > > > > +1 > > > > > > |
+1 Very nice addition.
On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote: > Sounds good, Max, let's to this in one fix. > > We can maintain a counter in the ExecutionEnvironment that tracks how many > executions have happened. > In case of no prior execution, simply warn that no sinks are defined. > In case a prior execution happened, point out that nothing new is pending > execution. > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > wrote: > > > I agree, print should print on the client. However, let's introduce some > > big hint in the error message in case of a second execute() that this > error > > may arise from a previous execution. > > > > Instead of "No sinks defined", let's print "The Flink job didn't contain > > any sinks. This may be because the sinks were already executed. If you > > executed the print() method on a DataSet before, the job would have > already > > been executed. In this case, remove the call of execute() until you have > > defined further sinks". > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> > wrote: > > > > > +1 for the breaking change > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > > > > > > > +1 for the breaking change > > > > > > > > > > Let's not to this any more than necessary, bu this is a good > case... > > > > > > > > +1 > > > > > > > > > > |
+1
On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]> wrote: > +1 Very nice addition. > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote: > > > Sounds good, Max, let's to this in one fix. > > > > We can maintain a counter in the ExecutionEnvironment that tracks how > many > > executions have happened. > > In case of no prior execution, simply warn that no sinks are defined. > > In case a prior execution happened, point out that nothing new is pending > > execution. > > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > > wrote: > > > > > I agree, print should print on the client. However, let's introduce > some > > > big hint in the error message in case of a second execute() that this > > error > > > may arise from a previous execution. > > > > > > Instead of "No sinks defined", let's print "The Flink job didn't > contain > > > any sinks. This may be because the sinks were already executed. If you > > > executed the print() method on a DataSet before, the job would have > > already > > > been executed. In this case, remove the call of execute() until you > have > > > defined further sinks". > > > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > +1 for the breaking change > > > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > Let's not to this any more than necessary, bu this is a good > > case... > > > > > > > > > > +1 > > > > > > > > > > > > > > > |
Why aren't we adding a new method "printLocal()" which is doing the local
printing? That would not break any existing code. How much work would it be to implement this "properly" without piggybacking the accumulators? I assume we would need to write the data to one or more partitions and request the partitions from the client JVM. I also assume that we have the code in place now to do it like this. On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]> wrote: > +1 > > On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]> > wrote: > > > +1 Very nice addition. > > > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> wrote: > > > > > Sounds good, Max, let's to this in one fix. > > > > > > We can maintain a counter in the ExecutionEnvironment that tracks how > > many > > > executions have happened. > > > In case of no prior execution, simply warn that no sinks are defined. > > > In case a prior execution happened, point out that nothing new is > pending > > > execution. > > > > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > > > wrote: > > > > > > > I agree, print should print on the client. However, let's introduce > > some > > > > big hint in the error message in case of a second execute() that this > > > error > > > > may arise from a previous execution. > > > > > > > > Instead of "No sinks defined", let's print "The Flink job didn't > > contain > > > > any sinks. This may be because the sinks were already executed. If > you > > > > executed the print() method on a DataSet before, the job would have > > > already > > > > been executed. In this case, remove the call of execute() until you > > have > > > > defined further sinks". > > > > > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > +1 for the breaking change > > > > > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > > > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> wrote: > > > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > > > Let's not to this any more than necessary, bu this is a good > > > case... > > > > > > > > > > > > +1 > > > > > > > > > > > > > > > > > > > > > |
>
> Why aren't we adding a new method "printLocal()" which is doing the local > printing? Simply because it is counter-intuitive that a regular print does not print on the Client. Introducing a new method would not solve this confusion. I think it's fine to break the API as long as we provide a meaningful error message. It should be ok to use the Accumulator way. We have to force an execution either way to compute the result to be printed. Printing shouldn't be used for large output anyways. On Tue, Apr 28, 2015 at 3:49 PM, Robert Metzger <[hidden email]> wrote: > Why aren't we adding a new method "printLocal()" which is doing the local > printing? > That would not break any existing code. > > How much work would it be to implement this "properly" without piggybacking > the accumulators? > I assume we would need to write the data to one or more partitions and > request the partitions from the client JVM. > I also assume that we have the code in place now to do it like this. > > On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]> > wrote: > > > +1 > > > > On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]> > > wrote: > > > > > +1 Very nice addition. > > > > > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> > wrote: > > > > > > > Sounds good, Max, let's to this in one fix. > > > > > > > > We can maintain a counter in the ExecutionEnvironment that tracks how > > > many > > > > executions have happened. > > > > In case of no prior execution, simply warn that no sinks are defined. > > > > In case a prior execution happened, point out that nothing new is > > pending > > > > execution. > > > > > > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > > > > wrote: > > > > > > > > > I agree, print should print on the client. However, let's introduce > > > some > > > > > big hint in the error message in case of a second execute() that > this > > > > error > > > > > may arise from a previous execution. > > > > > > > > > > Instead of "No sinks defined", let's print "The Flink job didn't > > > contain > > > > > any sinks. This may be because the sinks were already executed. If > > you > > > > > executed the print() method on a DataSet before, the job would have > > > > already > > > > > been executed. In this case, remove the call of execute() until you > > > have > > > > > defined further sinks". > > > > > > > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske <[hidden email]> > > > > wrote: > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > > > > > > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> > wrote: > > > > > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > > > > > Let's not to this any more than necessary, bu this is a good > > > > case... > > > > > > > > > > > > > > +1 > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Implementing this "properly" is the same thing as supporting larger
collect() payloads and larger accumulators. For that, data needs to go through the BLOB manager, rather than be part of the actor messages. Stephan On Tue, Apr 28, 2015 at 5:27 PM, Maximilian Michels <[hidden email]> wrote: > > > > Why aren't we adding a new method "printLocal()" which is doing the local > > printing? > > > Simply because it is counter-intuitive that a regular print does not print > on the Client. Introducing a new method would not solve this confusion. I > think it's fine to break the API as long as we provide a meaningful error > message. > > It should be ok to use the Accumulator way. We have to force an execution > either way to compute the result to be printed. Printing shouldn't be used > for large output anyways. > > On Tue, Apr 28, 2015 at 3:49 PM, Robert Metzger <[hidden email]> > wrote: > > > Why aren't we adding a new method "printLocal()" which is doing the local > > printing? > > That would not break any existing code. > > > > How much work would it be to implement this "properly" without > piggybacking > > the accumulators? > > I assume we would need to write the data to one or more partitions and > > request the partitions from the client JVM. > > I also assume that we have the code in place now to do it like this. > > > > On Tue, Apr 28, 2015 at 3:45 PM, Till Rohrmann <[hidden email]> > > wrote: > > > > > +1 > > > > > > On Tue, Apr 28, 2015 at 3:19 PM, Maximilian Michels <[hidden email]> > > > wrote: > > > > > > > +1 Very nice addition. > > > > > > > > On Tue, Apr 28, 2015 at 2:12 PM, Stephan Ewen <[hidden email]> > > wrote: > > > > > > > > > Sounds good, Max, let's to this in one fix. > > > > > > > > > > We can maintain a counter in the ExecutionEnvironment that tracks > how > > > > many > > > > > executions have happened. > > > > > In case of no prior execution, simply warn that no sinks are > defined. > > > > > In case a prior execution happened, point out that nothing new is > > > pending > > > > > execution. > > > > > > > > > > On Tue, Apr 28, 2015 at 2:04 PM, Maximilian Michels < > [hidden email]> > > > > > wrote: > > > > > > > > > > > I agree, print should print on the client. However, let's > introduce > > > > some > > > > > > big hint in the error message in case of a second execute() that > > this > > > > > error > > > > > > may arise from a previous execution. > > > > > > > > > > > > Instead of "No sinks defined", let's print "The Flink job didn't > > > > contain > > > > > > any sinks. This may be because the sinks were already executed. > If > > > you > > > > > > executed the print() method on a DataSet before, the job would > have > > > > > already > > > > > > been executed. In this case, remove the call of execute() until > you > > > > have > > > > > > defined further sinks". > > > > > > > > > > > > On Tue, Apr 28, 2015 at 1:24 PM, Fabian Hueske < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > > > 2015-04-28 13:18 GMT+02:00 Ufuk Celebi <[hidden email]>: > > > > > > > > > > > > > > > > > > > > > > > On 28 Apr 2015, at 12:31, Stephan Ewen <[hidden email]> > > wrote: > > > > > > > > > > > > > > > > > +1 for the breaking change > > > > > > > > > > > > > > > > > > Let's not to this any more than necessary, bu this is a > good > > > > > case... > > > > > > > > > > > > > > > > +1 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |