Should collect() and count() be treated as data sinks?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Should collect() and count() be treated as data sinks?

Felix Neutatz
Hi,

I have run the following program:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

List l = Arrays.asList(new Tuple1<Long>(1L));
TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
DataSet<Tuple1<Long>> data = env.fromCollection(l, t);

long value = data.count();
System.out.println(value);

env.execute("example");


Since there is no "real" data sink, I get the following:
Exception in thread "main" java.lang.RuntimeException: No data sinks have
been created yet. A program needs at least one sink that consumes data.
Examples are writing the data set or printing it.

In my opinion, we should handle count() and collect() like print().

What do you think?

Best regards,

Felix
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

Aljoscha Krettek-2
In my opinion it should not be handled like print. The idea behind
count()/collect() is that they immediately return the result which can
then be used in further flink operations.

Right now, this is not properly/efficiently implemented but once we
have support for intermediate results on this level they start making
more sense. Also, in such a case an execute would not be required
after a collect()/count() if only the result of that call is required.

On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> wrote:

> Hi,
>
> I have run the following program:
>
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
> List l = Arrays.asList(new Tuple1<Long>(1L));
> TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
>
> long value = data.count();
> System.out.println(value);
>
> env.execute("example");
>
>
> Since there is no "real" data sink, I get the following:
> Exception in thread "main" java.lang.RuntimeException: No data sinks have
> been created yet. A program needs at least one sink that consumes data.
> Examples are writing the data set or printing it.
>
> In my opinion, we should handle count() and collect() like print().
>
> What do you think?
>
> Best regards,
>
> Felix
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

mxm
In reply to this post by Felix Neutatz
Hi Felix,

count() defines a sink through the DiscardingOutputFormat. The error you're
seeing is because the execution of the plan is already triggered within the
count() method. When you call env.execute() again, the plan has been
already cleared from the ExecutionEnvironment and it fails to execute.

We should probably make it a bit more obvious how count and collect behave.
At least, we should improve the docs with a big warning.

In the future, when we can resume executions, we might delay the execution
in count/collect. On execute, we then execute once to get the accumulator
result for count/collect, and a second time to resume and execute the rest
of the plan. That is, if the current implementation remains the same...

Best,
Max


On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]>
wrote:

> Hi,
>
> I have run the following program:
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> List l = Arrays.asList(new Tuple1<Long>(1L));
> TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
>
> long value = data.count();
> System.out.println(value);
>
> env.execute("example");
>
>
> Since there is no "real" data sink, I get the following:
> Exception in thread "main" java.lang.RuntimeException: No data sinks have
> been created yet. A program needs at least one sink that consumes data.
> Examples are writing the data set or printing it.
>
> In my opinion, we should handle count() and collect() like print().
>
> What do you think?
>
> Best regards,
>
> Felix
>
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

aalexandrov
In reply to this post by Felix Neutatz
I have a similar issue here:

I would like to run a dataflow up to a particular point and materialize (in
memory) the intermediate result. Is this possible at the moment?

Regards,
Alex

2015-04-02 17:33 GMT+02:00 Felix Neutatz <[hidden email]>:

> Hi,
>
> I have run the following program:
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> List l = Arrays.asList(new Tuple1<Long>(1L));
> TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
>
> long value = data.count();
> System.out.println(value);
>
> env.execute("example");
>
>
> Since there is no "real" data sink, I get the following:
> Exception in thread "main" java.lang.RuntimeException: No data sinks have
> been created yet. A program needs at least one sink that consumes data.
> Examples are writing the data set or printing it.
>
> In my opinion, we should handle count() and collect() like print().
>
> What do you think?
>
> Best regards,
>
> Felix
>
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

Stephan Ewen
In reply to this post by Aljoscha Krettek-2
count() and collect() need to immediately trigger an execution, because the
driver program cannot proceed otherwise. They are "eager".

Regular sinks are "lazy", they wait until the program is triggered anyways.

BTW: Should "print()" be also an "eager" statement? I think it needs to be,
if we want to print to the driver's std out.

On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]>
wrote:

> In my opinion it should not be handled like print. The idea behind
> count()/collect() is that they immediately return the result which can
> then be used in further flink operations.
>
> Right now, this is not properly/efficiently implemented but once we
> have support for intermediate results on this level they start making
> more sense. Also, in such a case an execute would not be required
> after a collect()/count() if only the result of that call is required.
>
> On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]>
> wrote:
> > Hi,
> >
> > I have run the following program:
> >
> > final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >
> > List l = Arrays.asList(new Tuple1<Long>(1L));
> > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> > DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
> >
> > long value = data.count();
> > System.out.println(value);
> >
> > env.execute("example");
> >
> >
> > Since there is no "real" data sink, I get the following:
> > Exception in thread "main" java.lang.RuntimeException: No data sinks have
> > been created yet. A program needs at least one sink that consumes data.
> > Examples are writing the data set or printing it.
> >
> > In my opinion, we should handle count() and collect() like print().
> >
> > What do you think?
> >
> > Best regards,
> >
> > Felix
>
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

aalexandrov
> Should "print()" be also an "eager" statement?

I would expect this to be the case as I can only imagine an implementation
of print() via collect().

2015-04-06 14:37 GMT+02:00 Stephan Ewen <[hidden email]>:

> count() and collect() need to immediately trigger an execution, because the
> driver program cannot proceed otherwise. They are "eager".
>
> Regular sinks are "lazy", they wait until the program is triggered anyways.
>
> BTW: Should "print()" be also an "eager" statement? I think it needs to be,
> if we want to print to the driver's std out.
>
> On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > In my opinion it should not be handled like print. The idea behind
> > count()/collect() is that they immediately return the result which can
> > then be used in further flink operations.
> >
> > Right now, this is not properly/efficiently implemented but once we
> > have support for intermediate results on this level they start making
> > more sense. Also, in such a case an execute would not be required
> > after a collect()/count() if only the result of that call is required.
> >
> > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]>
> > wrote:
> > > Hi,
> > >
> > > I have run the following program:
> > >
> > > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> > >
> > > List l = Arrays.asList(new Tuple1<Long>(1L));
> > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
> > >
> > > long value = data.count();
> > > System.out.println(value);
> > >
> > > env.execute("example");
> > >
> > >
> > > Since there is no "real" data sink, I get the following:
> > > Exception in thread "main" java.lang.RuntimeException: No data sinks
> have
> > > been created yet. A program needs at least one sink that consumes data.
> > > Examples are writing the data set or printing it.
> > >
> > > In my opinion, we should handle count() and collect() like print().
> > >
> > > What do you think?
> > >
> > > Best regards,
> > >
> > > Felix
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

mxm
In reply to this post by Stephan Ewen
On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote:

> BTW: Should "print()" be also an "eager" statement? I think it needs to be,
> if we want to print to the driver's std out


Yes, if we change print() to print on the Client, then it needs to execute
eagerly.

On Thu, Apr 2, 2015 at 6:59 PM, Alexander Alexandrov <
[hidden email]> wrote:

> I would like to run a dataflow up to a particular point and materialize (in
> memory) the intermediate result. Is this possible at the moment?
>

Blocking execution mode is already implemented but the results are
currently not cached. That means that they are lost after they are consumed.

On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote:

> count() and collect() need to immediately trigger an execution, because the
> driver program cannot proceed otherwise. They are "eager".
>
> Regular sinks are "lazy", they wait until the program is triggered anyways.
>
> BTW: Should "print()" be also an "eager" statement? I think it needs to be,
> if we want to print to the driver's std out.
>
> On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > In my opinion it should not be handled like print. The idea behind
> > count()/collect() is that they immediately return the result which can
> > then be used in further flink operations.
> >
> > Right now, this is not properly/efficiently implemented but once we
> > have support for intermediate results on this level they start making
> > more sense. Also, in such a case an execute would not be required
> > after a collect()/count() if only the result of that call is required.
> >
> > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]>
> > wrote:
> > > Hi,
> > >
> > > I have run the following program:
> > >
> > > final ExecutionEnvironment env =
> > ExecutionEnvironment.getExecutionEnvironment();
> > >
> > > List l = Arrays.asList(new Tuple1<Long>(1L));
> > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
> > >
> > > long value = data.count();
> > > System.out.println(value);
> > >
> > > env.execute("example");
> > >
> > >
> > > Since there is no "real" data sink, I get the following:
> > > Exception in thread "main" java.lang.RuntimeException: No data sinks
> have
> > > been created yet. A program needs at least one sink that consumes data.
> > > Examples are writing the data set or printing it.
> > >
> > > In my opinion, we should handle count() and collect() like print().
> > >
> > > What do you think?
> > >
> > > Best regards,
> > >
> > > Felix
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Should collect() and count() be treated as data sinks?

Stephan Ewen
For the sake of prototyping, can you use a util that simply materializes
the intermediate result in a file system (using typeInfo input and output
formats) ?

On Tue, Apr 7, 2015 at 6:21 PM, Maximilian Michels <[hidden email]> wrote:

> On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote:
>
> > BTW: Should "print()" be also an "eager" statement? I think it needs to
> be,
> > if we want to print to the driver's std out
>
>
> Yes, if we change print() to print on the Client, then it needs to execute
> eagerly.
>
> On Thu, Apr 2, 2015 at 6:59 PM, Alexander Alexandrov <
> [hidden email]> wrote:
>
> > I would like to run a dataflow up to a particular point and materialize
> (in
> > memory) the intermediate result. Is this possible at the moment?
> >
>
> Blocking execution mode is already implemented but the results are
> currently not cached. That means that they are lost after they are
> consumed.
>
> On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote:
>
> > count() and collect() need to immediately trigger an execution, because
> the
> > driver program cannot proceed otherwise. They are "eager".
> >
> > Regular sinks are "lazy", they wait until the program is triggered
> anyways.
> >
> > BTW: Should "print()" be also an "eager" statement? I think it needs to
> be,
> > if we want to print to the driver's std out.
> >
> > On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > In my opinion it should not be handled like print. The idea behind
> > > count()/collect() is that they immediately return the result which can
> > > then be used in further flink operations.
> > >
> > > Right now, this is not properly/efficiently implemented but once we
> > > have support for intermediate results on this level they start making
> > > more sense. Also, in such a case an execute would not be required
> > > after a collect()/count() if only the result of that call is required.
> > >
> > > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]>
> > > wrote:
> > > > Hi,
> > > >
> > > > I have run the following program:
> > > >
> > > > final ExecutionEnvironment env =
> > > ExecutionEnvironment.getExecutionEnvironment();
> > > >
> > > > List l = Arrays.asList(new Tuple1<Long>(1L));
> > > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>");
> > > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t);
> > > >
> > > > long value = data.count();
> > > > System.out.println(value);
> > > >
> > > > env.execute("example");
> > > >
> > > >
> > > > Since there is no "real" data sink, I get the following:
> > > > Exception in thread "main" java.lang.RuntimeException: No data sinks
> > have
> > > > been created yet. A program needs at least one sink that consumes
> data.
> > > > Examples are writing the data set or printing it.
> > > >
> > > > In my opinion, we should handle count() and collect() like print().
> > > >
> > > > What do you think?
> > > >
> > > > Best regards,
> > > >
> > > > Felix
> > >
> >
>