Hi,
I have run the following program: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); List l = Arrays.asList(new Tuple1<Long>(1L)); TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); DataSet<Tuple1<Long>> data = env.fromCollection(l, t); long value = data.count(); System.out.println(value); env.execute("example"); Since there is no "real" data sink, I get the following: Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. In my opinion, we should handle count() and collect() like print(). What do you think? Best regards, Felix |
In my opinion it should not be handled like print. The idea behind
count()/collect() is that they immediately return the result which can then be used in further flink operations. Right now, this is not properly/efficiently implemented but once we have support for intermediate results on this level they start making more sense. Also, in such a case an execute would not be required after a collect()/count() if only the result of that call is required. On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> wrote: > Hi, > > I have run the following program: > > final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > List l = Arrays.asList(new Tuple1<Long>(1L)); > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > long value = data.count(); > System.out.println(value); > > env.execute("example"); > > > Since there is no "real" data sink, I get the following: > Exception in thread "main" java.lang.RuntimeException: No data sinks have > been created yet. A program needs at least one sink that consumes data. > Examples are writing the data set or printing it. > > In my opinion, we should handle count() and collect() like print(). > > What do you think? > > Best regards, > > Felix |
In reply to this post by Felix Neutatz
Hi Felix,
count() defines a sink through the DiscardingOutputFormat. The error you're seeing is because the execution of the plan is already triggered within the count() method. When you call env.execute() again, the plan has been already cleared from the ExecutionEnvironment and it fails to execute. We should probably make it a bit more obvious how count and collect behave. At least, we should improve the docs with a big warning. In the future, when we can resume executions, we might delay the execution in count/collect. On execute, we then execute once to get the accumulator result for count/collect, and a second time to resume and execute the rest of the plan. That is, if the current implementation remains the same... Best, Max On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> wrote: > Hi, > > I have run the following program: > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > List l = Arrays.asList(new Tuple1<Long>(1L)); > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > long value = data.count(); > System.out.println(value); > > env.execute("example"); > > > Since there is no "real" data sink, I get the following: > Exception in thread "main" java.lang.RuntimeException: No data sinks have > been created yet. A program needs at least one sink that consumes data. > Examples are writing the data set or printing it. > > In my opinion, we should handle count() and collect() like print(). > > What do you think? > > Best regards, > > Felix > |
In reply to this post by Felix Neutatz
I have a similar issue here:
I would like to run a dataflow up to a particular point and materialize (in memory) the intermediate result. Is this possible at the moment? Regards, Alex 2015-04-02 17:33 GMT+02:00 Felix Neutatz <[hidden email]>: > Hi, > > I have run the following program: > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > List l = Arrays.asList(new Tuple1<Long>(1L)); > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > long value = data.count(); > System.out.println(value); > > env.execute("example"); > > > Since there is no "real" data sink, I get the following: > Exception in thread "main" java.lang.RuntimeException: No data sinks have > been created yet. A program needs at least one sink that consumes data. > Examples are writing the data set or printing it. > > In my opinion, we should handle count() and collect() like print(). > > What do you think? > > Best regards, > > Felix > |
In reply to this post by Aljoscha Krettek-2
count() and collect() need to immediately trigger an execution, because the
driver program cannot proceed otherwise. They are "eager". Regular sinks are "lazy", they wait until the program is triggered anyways. BTW: Should "print()" be also an "eager" statement? I think it needs to be, if we want to print to the driver's std out. On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]> wrote: > In my opinion it should not be handled like print. The idea behind > count()/collect() is that they immediately return the result which can > then be used in further flink operations. > > Right now, this is not properly/efficiently implemented but once we > have support for intermediate results on this level they start making > more sense. Also, in such a case an execute would not be required > after a collect()/count() if only the result of that call is required. > > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> > wrote: > > Hi, > > > > I have run the following program: > > > > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > > > List l = Arrays.asList(new Tuple1<Long>(1L)); > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > > > long value = data.count(); > > System.out.println(value); > > > > env.execute("example"); > > > > > > Since there is no "real" data sink, I get the following: > > Exception in thread "main" java.lang.RuntimeException: No data sinks have > > been created yet. A program needs at least one sink that consumes data. > > Examples are writing the data set or printing it. > > > > In my opinion, we should handle count() and collect() like print(). > > > > What do you think? > > > > Best regards, > > > > Felix > |
> Should "print()" be also an "eager" statement?
I would expect this to be the case as I can only imagine an implementation of print() via collect(). 2015-04-06 14:37 GMT+02:00 Stephan Ewen <[hidden email]>: > count() and collect() need to immediately trigger an execution, because the > driver program cannot proceed otherwise. They are "eager". > > Regular sinks are "lazy", they wait until the program is triggered anyways. > > BTW: Should "print()" be also an "eager" statement? I think it needs to be, > if we want to print to the driver's std out. > > On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > In my opinion it should not be handled like print. The idea behind > > count()/collect() is that they immediately return the result which can > > then be used in further flink operations. > > > > Right now, this is not properly/efficiently implemented but once we > > have support for intermediate results on this level they start making > > more sense. Also, in such a case an execute would not be required > > after a collect()/count() if only the result of that call is required. > > > > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> > > wrote: > > > Hi, > > > > > > I have run the following program: > > > > > > final ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > > > > List l = Arrays.asList(new Tuple1<Long>(1L)); > > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > > > > > long value = data.count(); > > > System.out.println(value); > > > > > > env.execute("example"); > > > > > > > > > Since there is no "real" data sink, I get the following: > > > Exception in thread "main" java.lang.RuntimeException: No data sinks > have > > > been created yet. A program needs at least one sink that consumes data. > > > Examples are writing the data set or printing it. > > > > > > In my opinion, we should handle count() and collect() like print(). > > > > > > What do you think? > > > > > > Best regards, > > > > > > Felix > > > |
In reply to this post by Stephan Ewen
On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote:
> BTW: Should "print()" be also an "eager" statement? I think it needs to be, > if we want to print to the driver's std out Yes, if we change print() to print on the Client, then it needs to execute eagerly. On Thu, Apr 2, 2015 at 6:59 PM, Alexander Alexandrov < [hidden email]> wrote: > I would like to run a dataflow up to a particular point and materialize (in > memory) the intermediate result. Is this possible at the moment? > Blocking execution mode is already implemented but the results are currently not cached. That means that they are lost after they are consumed. On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote: > count() and collect() need to immediately trigger an execution, because the > driver program cannot proceed otherwise. They are "eager". > > Regular sinks are "lazy", they wait until the program is triggered anyways. > > BTW: Should "print()" be also an "eager" statement? I think it needs to be, > if we want to print to the driver's std out. > > On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > In my opinion it should not be handled like print. The idea behind > > count()/collect() is that they immediately return the result which can > > then be used in further flink operations. > > > > Right now, this is not properly/efficiently implemented but once we > > have support for intermediate results on this level they start making > > more sense. Also, in such a case an execute would not be required > > after a collect()/count() if only the result of that call is required. > > > > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> > > wrote: > > > Hi, > > > > > > I have run the following program: > > > > > > final ExecutionEnvironment env = > > ExecutionEnvironment.getExecutionEnvironment(); > > > > > > List l = Arrays.asList(new Tuple1<Long>(1L)); > > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > > > > > long value = data.count(); > > > System.out.println(value); > > > > > > env.execute("example"); > > > > > > > > > Since there is no "real" data sink, I get the following: > > > Exception in thread "main" java.lang.RuntimeException: No data sinks > have > > > been created yet. A program needs at least one sink that consumes data. > > > Examples are writing the data set or printing it. > > > > > > In my opinion, we should handle count() and collect() like print(). > > > > > > What do you think? > > > > > > Best regards, > > > > > > Felix > > > |
For the sake of prototyping, can you use a util that simply materializes
the intermediate result in a file system (using typeInfo input and output formats) ? On Tue, Apr 7, 2015 at 6:21 PM, Maximilian Michels <[hidden email]> wrote: > On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote: > > > BTW: Should "print()" be also an "eager" statement? I think it needs to > be, > > if we want to print to the driver's std out > > > Yes, if we change print() to print on the Client, then it needs to execute > eagerly. > > On Thu, Apr 2, 2015 at 6:59 PM, Alexander Alexandrov < > [hidden email]> wrote: > > > I would like to run a dataflow up to a particular point and materialize > (in > > memory) the intermediate result. Is this possible at the moment? > > > > Blocking execution mode is already implemented but the results are > currently not cached. That means that they are lost after they are > consumed. > > On Mon, Apr 6, 2015 at 2:37 PM, Stephan Ewen <[hidden email]> wrote: > > > count() and collect() need to immediately trigger an execution, because > the > > driver program cannot proceed otherwise. They are "eager". > > > > Regular sinks are "lazy", they wait until the program is triggered > anyways. > > > > BTW: Should "print()" be also an "eager" statement? I think it needs to > be, > > if we want to print to the driver's std out. > > > > On Thu, Apr 2, 2015 at 5:51 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > In my opinion it should not be handled like print. The idea behind > > > count()/collect() is that they immediately return the result which can > > > then be used in further flink operations. > > > > > > Right now, this is not properly/efficiently implemented but once we > > > have support for intermediate results on this level they start making > > > more sense. Also, in such a case an execute would not be required > > > after a collect()/count() if only the result of that call is required. > > > > > > On Thu, Apr 2, 2015 at 5:33 PM, Felix Neutatz <[hidden email]> > > > wrote: > > > > Hi, > > > > > > > > I have run the following program: > > > > > > > > final ExecutionEnvironment env = > > > ExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > List l = Arrays.asList(new Tuple1<Long>(1L)); > > > > TypeInformation t = TypeInfoParser.parse("Tuple1<Long>"); > > > > DataSet<Tuple1<Long>> data = env.fromCollection(l, t); > > > > > > > > long value = data.count(); > > > > System.out.println(value); > > > > > > > > env.execute("example"); > > > > > > > > > > > > Since there is no "real" data sink, I get the following: > > > > Exception in thread "main" java.lang.RuntimeException: No data sinks > > have > > > > been created yet. A program needs at least one sink that consumes > data. > > > > Examples are writing the data set or printing it. > > > > > > > > In my opinion, we should handle count() and collect() like print(). > > > > > > > > What do you think? > > > > > > > > Best regards, > > > > > > > > Felix > > > > > > |
Free forum by Nabble | Edit this page |