This is my code
DataStream<Integer> someIntegers = env.fromElements(1,2,3,4); SplitStream<Integer> s = someIntegers.split(new OutputSelector<Integer>() { List<String> out = new ArrayList<String>(); @Override public Iterable<String> select(Integer arg0) { if( arg0 % 2 == 0){ out.add( "even" ); } else{ out.add("odd"); } return out; } }); s.select("even").flatMap(new FlatMapFunction<Integer, Tuple1<Integer>>() { @Override public void flatMap(Integer arg0, Collector<Tuple1<Integer>> arg1) throws Exception { System.out.println("---------------"+arg0); arg1.collect( new Tuple1<Integer>(arg0) ); } }).writeAsCsv("test",WriteMode.OVERWRITE,"," , ""); When I run this code, 1. A folder called "test" is getting created 2. I get 4 files . File names are 1 ,2 3, 4 3. file 1 ---> 1; file 2 ---> 3 ; file 3 --> 4; File 4 --> Empty. These are the contents My Questions: 1. Even if I increase the count in "fromElements", I get 4 files. Why does it create 4 files always 2. The code should split the stream in to odd and even and right now am printing only the "Even" stream. So 3 should not be printed I could not make out anything from this. Some help would be nice |
Hi madhairsilence,
1. The default parallelism of a job is set to the number of cpu cores on your machine, generally is 4. That means all operators’ (except source operator) parallelism is 4. That’s why you get always get 4 files, because there are 4 csv sink tasks. You can change the parallelism by `.setParallelism(1)` after `writeAsCsv(…)`. 2. I think it’s a bug in your code. The out list should not be an instance variable, but local variable in `select` method. When the element “3" come in, the result of OutpuSelector contains both “even“ and ”odd”, that’s why “3” is printed. Hope this helps you. - Jark Wu > 在 2017年1月3日,下午7:26,madhairsilence <[hidden email] <mailto:[hidden email]>> 写道: > > fromElements |
This post was updated on .
Spot on. Thanks for the explanation and pointing out the bug in the code
For developers who do the same silly mistake Correct code is; List<String> out = new ArrayList<String>(); //remove this line @Override public Iterable<String> select(Integer arg0) { List<String> out = new ArrayList<String>(); //Move it here if( arg0 % 2 == 0){ out.add( "even" ); } else{ out.add("odd"); } return out; } |
Free forum by Nabble | Edit this page |