Cluster execution of an example program ("Word count") and a problem related to the modificated example

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi all,
I am new to Flink/Stratosphere so I would like to welcome everyone. I was trying to configure a simple 2-node cluster (Ubuntu 12.04) and I came across following problems (using Stratosphere 0.5):

1)      I execute a "Word count" example and after the execution I get the result split into 2 files (each one on the different node). How can I modify this program to get the whole result as a one merged file on the master node?

2)      I have wrote my own program basing on a "Word count" program's structure. There aren't any problems with the execution in a local mode for any size of the file. For small files it works fine on the cluster too. Unfortunately I get following error for bigger files (2000 lines):

06/28/2014 18:15:20:    Job execution switched to status SCHEDULED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to SCHEDULED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to ASSIGNED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to READY
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to READY
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to STARTING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to STARTING
06/28/2014 18:15:20:    Job execution switched to status RUNNING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to READY
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to STARTING
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to RUNNING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MAX(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1c2ab6c>) (2/2) switched to CANCELED
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (1/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (1/2) switched to CANCELED
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    Reduce (SUM(2),MIN(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to CANCELING
06/28/2014 18:15:20:    DataSink(eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0<mailto:eu.stratosphere.api.java.io.CsvOutputFormat@1e24af0>) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer) -> Combine (SUM(2),MAX(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELING
06/28/2014 18:15:20:    CHAIN FlatMap (eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2) -> Combine (SUM(2),MIN(1)) (2/2) switched to CANCELED
06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to FAILED
java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (2/2) switched to CANCELED
06/28/2014 18:15:20:    Job execution switched to status FAILED
Error: The program execution failed: java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

eu.stratosphere.client.program.ProgramInvocationException: The program execution failed: java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
    at java.lang.Thread.run(Thread.java:701)
Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
    at java.io.FileInputStream.open(Native Method)
    at java.io.FileInputStream.<init>(FileInputStream.java:140)
    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

    at eu.stratosphere.client.program.Client.run(Client.java:297)
    at eu.stratosphere.client.program.Client.run(Client.java:263)
    at eu.stratosphere.client.program.Client.run(Client.java:257)
    at eu.stratosphere.client.program.ContextEnvironment.execute(ContextEnvironment.java:50)
    at eu.stratosphere.example.java.wordcount.WordCount.main(WordCount.java:109)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
    at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
    at eu.stratosphere.client.program.Client.run(Client.java:215)
    at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
    at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
    at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
    at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)

Cluster configuration:
Slaves:
192.168.11.216
192.168.11.202

Job Manager:
jobmanager.rpc.address: 192.168.11.202


I would be very grateful for any help.

Best regards,
Krzysztof Pasierbiński

Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Ufuk Celebi
Hey Krzysztof,

thanks for posting your questions. Replies are inline.

On 28 Jun 2014, at 18:59, Krzysztof Pasierbinski <[hidden email]> wrote:

> Hi all,
> I am new to Flink/Stratosphere so I would like to welcome everyone. I was trying to configure a simple 2-node cluster (Ubuntu 12.04) and I came across following problems (using Stratosphere 0.5):
>
> 1)      I execute a "Word count" example and after the execution I get the result split into 2 files (each one on the different node). How can I modify this program to get the whole result as a one merged file on the master node?

You can add an ungrouped group reduce, which would create a single file.

input
.flatMap(...)
.groupBy(0)
.sum(1)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                                        @Override
                                        public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
                                                while (values.hasNext()) {
                                                        out.collect(values.next());
                                                }
                                        }
                                });


Because the reducers work on separate groups of data and run on different machines you get a file for each reducer. Along the same lines you can also directly use a GroupReduceFunction for the sum and save the extra reduce. But keep in mind that the multiple files should not be a problem in practice (since further processing steps would just have the output directory as input) and the group reduce does not scale to large data sets (since a single reducer on a single machine does all the processing).

>
> 2)      I have wrote my own program basing on a "Word count" program's structure. There aren't any problems with the execution in a local mode for any size of the file. For small files it works fine on the cluster too. Unfortunately I get following error for bigger files (2000 lines):
>
> 06/28/2014 18:15:20:    DataSource(TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies.txt) - UTF-8) (1/2) switched to FAILED
> java.io.IOException: Error opening the Input Split file:/home/krzysztof/stratosphere05/generatedFrequencies.txt [41847,41847]: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
>    at eu.stratosphere.api.common.io.FileInputFormat.open(FileInputFormat.java:616)
>    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:441)
>    at eu.stratosphere.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:44)
>    at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:140)
>    at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:351)
>    at java.lang.Thread.run(Thread.java:701)
> Caused by: java.io.FileNotFoundException: /home/krzysztof/stratosphere05/generatedFrequencies.txt (No such file or directory)
>    at java.io.FileInputStream.open(Native Method)
>    at java.io.FileInputStream.<init>(FileInputStream.java:140)
>    at eu.stratosphere.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:44)
>    at eu.stratosphere.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:135)
>    at eu.stratosphere.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:756)

With this error message I'm not sure at the moment, if it is a problem with the InputSplit or if the file really does not exist. So just to make sure: do both machines have the same DFS mounted? If you want you can also provide me with the program and file, so I can check if it is a problem with your setup or not.

Best,

Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Stephan Ewen
Hey!

I think Ufuk is onto the right problem. Form the logs it seems as if the
master can access the files, but the workers cannot.

You can always get the result in a single file, by setting the parallelism
of the sink task to one, for example line
"result.writeAsText(path).parallelism(1)".

The counting reducers will run in parallel, but ship their results to a
single file writer.

We are planning to add support to move program results back to the
master/client, but it will be a few weeks. Of course, this is only feasibly
if the results are not huge.

Greetings,
Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Ufuk Celebi

> On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
>
> Hey!
>
> You can always get the result in a single file, by setting the parallelism
> of the sink task to one, for example line
> "result.writeAsText(path).parallelism(1)".

Oh sure. I realized this after sending the mail. Thanks for pointing it out. :)
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Aljoscha Krettek-2
Hi Krzysztof,
for the file acces problem: From the path it looks like you are accessing
them as local files rather than as files in a distributed file system (HDFS
is the default here). So one of the nodes can access the file because it is
actually on the machine where the code is running while the other code
executes on a machine where the file is not available. This explains how to
setup hadoop with HDFS:
http://hadoop.apache.org/docs/r1.2.1/cluster_setup.html . You only need to
start HDFS, though,  with "bin/start-dfs.sh". For accessing files inside
HDFS from flink you would use a path such as "hdfs:///foo/bar"

Please write again if you need more help.

Aljoscha


On Sat, Jun 28, 2014 at 10:57 PM, Ufuk Celebi <[hidden email]> wrote:

>
> > On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
> >
> > Hey!
> >
> > You can always get the result in a single file, by setting the
> parallelism
> > of the sink task to one, for example line
> > "result.writeAsText(path).parallelism(1)".
>
> Oh sure. I realized this after sending the mail. Thanks for pointing it
> out. :)
>
Reply | Threaded
Open this post in threaded view
|

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi all,
thank you all for prompt replies. It is great to know, that there is so strong community support. Yes indeed, I don't use Hadoop yet. I wanted to try out Flink framework and then integrate it with Hadoop. I have read somewhere that Hadoop is not obligatory.
I wonder, why the same program with the same configuration works fine for small files and this error appears only for the bigger ones. The example program "Word count" works always fine, so I suppose that there is my mistake somewhere behind.


-----Ursprüngliche Nachricht-----
Von: Aljoscha Krettek [mailto:[hidden email]]
Gesendet: Sonntag, 29. Juni 2014 09:24
An: [hidden email]
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hi Krzysztof,
for the file acces problem: From the path it looks like you are accessing them as local files rather than as files in a distributed file system (HDFS is the default here). So one of the nodes can access the file because it is actually on the machine where the code is running while the other code executes on a machine where the file is not available. This explains how to setup hadoop with HDFS:
http://hadoop.apache.org/docs/r1.2.1/cluster_setup.html . You only need to start HDFS, though,  with "bin/start-dfs.sh". For accessing files inside HDFS from flink you would use a path such as "hdfs:///foo/bar"

Please write again if you need more help.

Aljoscha


On Sat, Jun 28, 2014 at 10:57 PM, Ufuk Celebi <[hidden email]> wrote:

>
> > On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
> >
> > Hey!
> >
> > You can always get the result in a single file, by setting the
> parallelism
> > of the sink task to one, for example line
> > "result.writeAsText(path).parallelism(1)".
>
> Oh sure. I realized this after sending the mail. Thanks for pointing
> it out. :)
>
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Fabian Hueske-2
Hi Krzysztof,

reading and writing data from the local file system in a distributed setup
is always a bit tricky.
For Flink, the input files must be available in the local file system of
each work node (and I think also on the master node), i.e., the data needs
to be copied (replicated) to each machine or the directory must be shared
for example via NFS (note, using shared directory might cause very bad IO
performance).
For output files, the result path must be available on each machine.

As Aljoscha said, the preferred way to go is to use a distributed
filesystem (HDFS) in distributed scenario.
Nonetheless, using the local FS in distributed setup should work.

Have you checked if your input data
(/home/krzysztof/stratosphere05/generatedFrequencies.txt) is available on
each machine (workers + master)?

Cheers, Fabian



2014-06-29 15:06 GMT+02:00 Krzysztof Pasierbinski <
[hidden email]>:

> Hi all,
> thank you all for prompt replies. It is great to know, that there is so
> strong community support. Yes indeed, I don't use Hadoop yet. I wanted to
> try out Flink framework and then integrate it with Hadoop. I have read
> somewhere that Hadoop is not obligatory.
> I wonder, why the same program with the same configuration works fine for
> small files and this error appears only for the bigger ones. The example
> program "Word count" works always fine, so I suppose that there is my
> mistake somewhere behind.
>
>
> -----Ursprüngliche Nachricht-----
> Von: Aljoscha Krettek [mailto:[hidden email]]
> Gesendet: Sonntag, 29. Juni 2014 09:24
> An: [hidden email]
> Betreff: Re: Cluster execution of an example program ("Word count") and a
> problem related to the modificated example
>
> Hi Krzysztof,
> for the file acces problem: From the path it looks like you are accessing
> them as local files rather than as files in a distributed file system (HDFS
> is the default here). So one of the nodes can access the file because it is
> actually on the machine where the code is running while the other code
> executes on a machine where the file is not available. This explains how to
> setup hadoop with HDFS:
> http://hadoop.apache.org/docs/r1.2.1/cluster_setup.html . You only need
> to start HDFS, though,  with "bin/start-dfs.sh". For accessing files inside
> HDFS from flink you would use a path such as "hdfs:///foo/bar"
>
> Please write again if you need more help.
>
> Aljoscha
>
>
> On Sat, Jun 28, 2014 at 10:57 PM, Ufuk Celebi <[hidden email]>
> wrote:
>
> >
> > > On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
> > >
> > > Hey!
> > >
> > > You can always get the result in a single file, by setting the
> > parallelism
> > > of the sink task to one, for example line
> > > "result.writeAsText(path).parallelism(1)".
> >
> > Oh sure. I realized this after sending the mail. Thanks for pointing
> > it out. :)
> >
>
Reply | Threaded
Open this post in threaded view
|

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi Fabian,
thank you for the explanation. I was conscious, that there is no input file on the worker node and for small files it worked fine. I assumed, that the part of the input file is replicated automatically. The result path is available on both machines as I set up the same file system on each node. I see that I have to use HDFS for my use case.

-----Ursprüngliche Nachricht-----
Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sonntag, 29. Juni 2014 16:05
An: [hidden email]
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hi Krzysztof,

reading and writing data from the local file system in a distributed setup is always a bit tricky.
For Flink, the input files must be available in the local file system of each work node (and I think also on the master node), i.e., the data needs to be copied (replicated) to each machine or the directory must be shared for example via NFS (note, using shared directory might cause very bad IO performance).
For output files, the result path must be available on each machine.

As Aljoscha said, the preferred way to go is to use a distributed filesystem (HDFS) in distributed scenario.
Nonetheless, using the local FS in distributed setup should work.

Have you checked if your input data
(/home/krzysztof/stratosphere05/generatedFrequencies.txt) is available on each machine (workers + master)?

Cheers, Fabian



2014-06-29 15:06 GMT+02:00 Krzysztof Pasierbinski <
[hidden email]>:

> Hi all,
> thank you all for prompt replies. It is great to know, that there is
> so strong community support. Yes indeed, I don't use Hadoop yet. I
> wanted to try out Flink framework and then integrate it with Hadoop. I
> have read somewhere that Hadoop is not obligatory.
> I wonder, why the same program with the same configuration works fine
> for small files and this error appears only for the bigger ones. The
> example program "Word count" works always fine, so I suppose that
> there is my mistake somewhere behind.
>
>
> -----Ursprüngliche Nachricht-----
> Von: Aljoscha Krettek [mailto:[hidden email]]
> Gesendet: Sonntag, 29. Juni 2014 09:24
> An: [hidden email]
> Betreff: Re: Cluster execution of an example program ("Word count")
> and a problem related to the modificated example
>
> Hi Krzysztof,
> for the file acces problem: From the path it looks like you are
> accessing them as local files rather than as files in a distributed
> file system (HDFS is the default here). So one of the nodes can access
> the file because it is actually on the machine where the code is
> running while the other code executes on a machine where the file is
> not available. This explains how to setup hadoop with HDFS:
> http://hadoop.apache.org/docs/r1.2.1/cluster_setup.html . You only
> need to start HDFS, though,  with "bin/start-dfs.sh". For accessing
> files inside HDFS from flink you would use a path such as "hdfs:///foo/bar"
>
> Please write again if you need more help.
>
> Aljoscha
>
>
> On Sat, Jun 28, 2014 at 10:57 PM, Ufuk Celebi <[hidden email]>
> wrote:
>
> >
> > > On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
> > >
> > > Hey!
> > >
> > > You can always get the result in a single file, by setting the
> > parallelism
> > > of the sink task to one, for example line
> > > "result.writeAsText(path).parallelism(1)".
> >
> > Oh sure. I realized this after sending the mail. Thanks for pointing
> > it out. :)
> >
>
Reply | Threaded
Open this post in threaded view
|

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi Fabian,
I have copied the input file to the second node and it worked. Indeed, I have to use HDFS. Although, I don't still understand why in this case size (of the input file) matters.
Once more time, thank you a lot for your help!

-----Ursprüngliche Nachricht-----
Von: Krzysztof Pasierbinski [mailto:[hidden email]]
Gesendet: Sonntag, 29. Juni 2014 16:17
An: [hidden email]
Betreff: AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hi Fabian,
thank you for the explanation. I was conscious, that there is no input file on the worker node and for small files it worked fine. I assumed, that the part of the input file is replicated automatically. The result path is available on both machines as I set up the same file system on each node. I see that I have to use HDFS for my use case.

-----Ursprüngliche Nachricht-----
Von: Fabian Hueske [mailto:[hidden email]]
Gesendet: Sonntag, 29. Juni 2014 16:05
An: [hidden email]
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hi Krzysztof,

reading and writing data from the local file system in a distributed setup is always a bit tricky.
For Flink, the input files must be available in the local file system of each work node (and I think also on the master node), i.e., the data needs to be copied (replicated) to each machine or the directory must be shared for example via NFS (note, using shared directory might cause very bad IO performance).
For output files, the result path must be available on each machine.

As Aljoscha said, the preferred way to go is to use a distributed filesystem (HDFS) in distributed scenario.
Nonetheless, using the local FS in distributed setup should work.

Have you checked if your input data
(/home/krzysztof/stratosphere05/generatedFrequencies.txt) is available on each machine (workers + master)?

Cheers, Fabian



2014-06-29 15:06 GMT+02:00 Krzysztof Pasierbinski <
[hidden email]>:

> Hi all,
> thank you all for prompt replies. It is great to know, that there is
> so strong community support. Yes indeed, I don't use Hadoop yet. I
> wanted to try out Flink framework and then integrate it with Hadoop. I
> have read somewhere that Hadoop is not obligatory.
> I wonder, why the same program with the same configuration works fine
> for small files and this error appears only for the bigger ones. The
> example program "Word count" works always fine, so I suppose that
> there is my mistake somewhere behind.
>
>
> -----Ursprüngliche Nachricht-----
> Von: Aljoscha Krettek [mailto:[hidden email]]
> Gesendet: Sonntag, 29. Juni 2014 09:24
> An: [hidden email]
> Betreff: Re: Cluster execution of an example program ("Word count")
> and a problem related to the modificated example
>
> Hi Krzysztof,
> for the file acces problem: From the path it looks like you are
> accessing them as local files rather than as files in a distributed
> file system (HDFS is the default here). So one of the nodes can access
> the file because it is actually on the machine where the code is
> running while the other code executes on a machine where the file is
> not available. This explains how to setup hadoop with HDFS:
> http://hadoop.apache.org/docs/r1.2.1/cluster_setup.html . You only
> need to start HDFS, though,  with "bin/start-dfs.sh". For accessing
> files inside HDFS from flink you would use a path such as "hdfs:///foo/bar"
>
> Please write again if you need more help.
>
> Aljoscha
>
>
> On Sat, Jun 28, 2014 at 10:57 PM, Ufuk Celebi <[hidden email]>
> wrote:
>
> >
> > > On 28 Jun 2014, at 22:52, Stephan Ewen <[hidden email]> wrote:
> > >
> > > Hey!
> > >
> > > You can always get the result in a single file, by setting the
> > parallelism
> > > of the sink task to one, for example line
> > > "result.writeAsText(path).parallelism(1)".
> >
> > Oh sure. I realized this after sending the mail. Thanks for pointing
> > it out. :)
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Stephan Ewen
Hi Krzysztof!

Indeed, the input size should not matter. Can you tell us the details of
the setup that worked?

The built-in examples work without distributed file system, because they do
not depend on files. The example programs set a Java Collection as the
input, which gets distributed as part of the program.

Stephan
Reply | Threaded
Open this post in threaded view
|

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi Stephan,
I have got 2 node configuration. The first node is a master and a worker, the second node is a worker. File path, Flink (Stratosphere) version and operation system is the same on both nodes.
My test program is in the attachment (simple modification of "Word count" example).  
The execution plan looks like this:
{
    "nodes": [

    {
        "id": 4,
        "type": "source",
        "pact": "Data Source",
        "contents": "TextInputFormat (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "3..6 GB" },
            { "name": "Est. Cardinality", "value": "98.43 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "3..6 GB" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 3,
        "type": "pact",
        "pact": "FlatMap",
        "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 2,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 3, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 1,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MAX(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 0,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 1, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 8,
        "type": "pact",
        "pact": "FlatMap",
        "contents": "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 4, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Map",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 7,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 8, "ship_strategy": "Forward"}
        ],
        "driver_strategy": "Sorted Combine",
        "global_properties": [
            { "name": "Partitioning", "value": "RANDOM" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "(none)" },
            { "name": "Grouping", "value": "not grouped" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "492.19 M" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "0.0 B" },
            { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
            { "name": "Cumulative CPU", "value": "0.0 " }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 6,
        "type": "pact",
        "pact": "GroupReduce",
        "contents": "SUM(2),MIN(1)",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"}
        ],
        "driver_strategy": "Sorted Group Reduce",
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "(unknown)" },
            { "name": "Disk I/O", "value": "(unknown)" },
            { "name": "CPU", "value": "(unknown)" },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    },
    {
        "id": 5,
        "type": "sink",
        "pact": "Data Sink",
        "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
        "parallelism": "2",
        "subtasks_per_instance": "1",
        "predecessors": [
            {"id": 6, "ship_strategy": "Forward"}
        ],
        "global_properties": [
            { "name": "Partitioning", "value": "HASH_PARTITIONED" },
            { "name": "Partitioned on", "value": "[0]" },
            { "name": "Partitioning Order", "value": "(none)" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "local_properties": [
            { "name": "Order", "value": "[0:ASC]" },
            { "name": "Grouped on", "value": "[0]" },
            { "name": "Uniqueness", "value": "not unique" }
        ],
        "estimates": [
            { "name": "Est. Output Size", "value": "(unknown)" },
            { "name": "Est. Cardinality", "value": "(unknown)" }        ],
        "costs": [
            { "name": "Network", "value": "0.0 B" },
            { "name": "Disk I/O", "value": "0.0 B" },
            { "name": "CPU", "value": "0.0 " },
            { "name": "Cumulative Network", "value": "(unknown)" },
            { "name": "Cumulative Disk I/O", "value": "(unknown)" },
            { "name": "Cumulative CPU", "value": "(unknown)" }
        ],
        "compiler_hints": [
            { "name": "Output Size (bytes)", "value": "(none)" },
            { "name": "Output Cardinality", "value": "(none)" },
            { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
            { "name": "Filter Factor", "value": "(none)" }        ]
    }
    ]
}

-----Ursprüngliche Nachricht-----
Von: [hidden email] [mailto:[hidden email]] Im Auftrag von Stephan Ewen
Gesendet: Sonntag, 29. Juni 2014 16:33
An: [hidden email]
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hi Krzysztof!

Indeed, the input size should not matter. Can you tell us the details of the setup that worked?

The built-in examples work without distributed file system, because they do not depend on files. The example programs set a Java Collection as the input, which gets distributed as part of the program.

Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Stephan Ewen
Hey Krzysztof!

Everything looks standard there.

Let me ask those questions, to make sure I get the discussion right:
 - You are running a two node setup. Has one node the master and a worker,
the other one has a worker only? Or do you have a dedicated master node?
 - Are the example on small data and on large data strictly the same,
except for differently sized input files?

Most importantly:
 - It the input file is available on the workers, is it available under the
path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt" ?

My guess right now is still that there the workers do not see the file
properly.

Greetings,
Stephan




On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski <
[hidden email]> wrote:

> Hi Stephan,
> I have got 2 node configuration. The first node is a master and a worker,
> the second node is a worker. File path, Flink (Stratosphere) version and
> operation system is the same on both nodes.
> My test program is in the attachment (simple modification of "Word count"
> example).
> The execution plan looks like this:
> {
>     "nodes": [
>
>     {
>         "id": 4,
>         "type": "source",
>         "pact": "Data Source",
>         "contents": "TextInputFormat
> (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "3..6 GB" },
>             { "name": "Est. Cardinality", "value": "98.43 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "3..6 GB" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 3,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 2,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 3, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 1,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 2, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 0,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 1, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 8,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 7,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 8, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 6,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 7, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 5,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 6, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     }
>     ]
> }
>
> -----Ursprüngliche Nachricht-----
> Von: [hidden email] [mailto:[hidden email]] Im Auftrag von
> Stephan Ewen
> Gesendet: Sonntag, 29. Juni 2014 16:33
> An: [hidden email]
> Betreff: Re: Cluster execution of an example program ("Word count") and a
> problem related to the modificated example
>
> Hi Krzysztof!
>
> Indeed, the input size should not matter. Can you tell us the details of
> the setup that worked?
>
> The built-in examples work without distributed file system, because they
> do not depend on files. The example programs set a Java Collection as the
> input, which gets distributed as part of the program.
>
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Krzysztof Pasierbinski
Hi Stephan,
thank you for your support.
I don't have a dedicated master node. My master node is worker at the same time. The second node is a worker only. The small and big files have exactly the same structure (type, path, structure, even the name - only the size and values change). At first, the input file was only available at master node (and for small files it works!). After copying the input file to the second worker node it works fine but I don't think that is the effective way to go. So I am going to switch to HDFS.


-----Ursprüngliche Nachricht-----
Von: [hidden email] [mailto:[hidden email]] Im Auftrag von Stephan Ewen
Gesendet: Dienstag, 1. Juli 2014 14:58
An: [hidden email]
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hey Krzysztof!

Everything looks standard there.

Let me ask those questions, to make sure I get the discussion right:
 - You are running a two node setup. Has one node the master and a worker, the other one has a worker only? Or do you have a dedicated master node?
 - Are the example on small data and on large data strictly the same, except for differently sized input files?

Most importantly:
 - It the input file is available on the workers, is it available under the path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt" ?

My guess right now is still that there the workers do not see the file properly.

Greetings,
Stephan




On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski < [hidden email]> wrote:

> Hi Stephan,
> I have got 2 node configuration. The first node is a master and a
> worker, the second node is a worker. File path, Flink (Stratosphere)
> version and operation system is the same on both nodes.
> My test program is in the attachment (simple modification of "Word count"
> example).
> The execution plan looks like this:
> {
>     "nodes": [
>
>     {
>         "id": 4,
>         "type": "source",
>         "pact": "Data Source",
>         "contents": "TextInputFormat
> (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "3..6 GB" },
>             { "name": "Est. Cardinality", "value": "98.43 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "3..6 GB" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 3,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 2,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 3, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 1,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 2, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 0,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 1, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 8,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 7,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 8, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 6,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 7, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 5,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 6, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     }
>     ]
> }
>
> -----Ursprüngliche Nachricht-----
> Von: [hidden email] [mailto:[hidden email]] Im Auftrag
> von Stephan Ewen
> Gesendet: Sonntag, 29. Juni 2014 16:33
> An: [hidden email]
> Betreff: Re: Cluster execution of an example program ("Word count")
> and a problem related to the modificated example
>
> Hi Krzysztof!
>
> Indeed, the input size should not matter. Can you tell us the details
> of the setup that worked?
>
> The built-in examples work without distributed file system, because
> they do not depend on files. The example programs set a Java
> Collection as the input, which gets distributed as part of the program.
>
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Stephan Ewen
Hi!

Okay, good to hear you solved the issue. HDFS is a good way to go in large
setups, though shared filesystems / SANs that are mounted on all machines
work as well (using Amazon EBS is an example for that).

Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Fabian Hueske
I thought about why it had worked for small input files.

Obviously, all input split have been read from the (local) worker that is
on the same machine as the master (otherwise the remote worker would have
raised a FileNotFoundException). Since the input splits were very small,
reading the first input split might have been faster than the request of
the remote worker to the JobManager to assign an input split. Therefore,
the local worker could requested the next input split before the remote
worker and read the full file.

Once the input was larger, reading the first split took longer than the
request of the remote worker which got a split assigned and failed to read
the file.




2014-07-07 12:16 GMT+02:00 Stephan Ewen <[hidden email]>:

> Hi!
>
> Okay, good to hear you solved the issue. HDFS is a good way to go in large
> setups, though shared filesystems / SANs that are mounted on all machines
> work as well (using Amazon EBS is an example for that).
>
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Stephan Ewen
Yes, I thought about that the same way. It will still generate two splits,
but they may end up at the same worker, if that worker is fast enough.

This sounds almost like a case for an "auto-dop" option for inputs.