Datastream splitter - Confusing behaviour

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Datastream splitter - Confusing behaviour

madhairsilence
This is my code

DataStream<Integer> someIntegers = env.fromElements(1,2,3,4);
               
                SplitStream<Integer> s = someIntegers.split(new OutputSelector<Integer>() {

                        List<String> out = new ArrayList<String>();
                        @Override
                        public Iterable<String> select(Integer arg0) {
                                if( arg0 % 2 == 0){
                                        out.add( "even" );
                                }
                                else{
                                        out.add("odd");
                                }
                               
                                return out;
                        }
                });
                s.select("even").flatMap(new FlatMapFunction<Integer, Tuple1<Integer>>() {
                        @Override
                        public void flatMap(Integer arg0, Collector<Tuple1<Integer>> arg1) throws Exception {
                                System.out.println("---------------"+arg0);
                                arg1.collect( new Tuple1<Integer>(arg0) );
                        }
                }).writeAsCsv("test",WriteMode.OVERWRITE,"," , "");


When I run this code,

1. A folder called "test" is getting created
2. I get 4 files . File names are 1 ,2 3, 4
3. file 1 ---> 1; file 2 ---> 3 ; file 3 --> 4; File 4 --> Empty. These are the contents

My Questions:
1. Even if I increase the count in "fromElements", I get 4 files. Why does it create 4 files always
2. The code should split the stream in to odd and even and right now am printing only the "Even" stream. So 3 should not be printed

I could not make out anything from this. Some help would be nice
Reply | Threaded
Open this post in threaded view
|

Re: Datastream splitter - Confusing behaviour

伍翀(云邪)
Hi madhairsilence,

1. The default parallelism of a job is set to the number of cpu cores on your machine, generally is 4. That means all operators’ (except source operator) parallelism is 4.
    That’s why you get always get 4 files, because there are 4 csv sink tasks. You can change the parallelism by `.setParallelism(1)` after `writeAsCsv(…)`.
2. I think it’s a bug in your code. The out list should not be an instance variable, but local variable in `select` method.  When the element “3" come in, the result of OutpuSelector
   contains both “even“ and ”odd”, that’s why “3” is printed.

Hope this helps you.

- Jark Wu

> 在 2017年1月3日,下午7:26,madhairsilence <[hidden email] <mailto:[hidden email]>> 写道:
>
> fromElements

Reply | Threaded
Open this post in threaded view
|

Re: Datastream splitter - Confusing behaviour

madhairsilence
This post was updated on .
Spot on. Thanks for the explanation and pointing out the bug in the code

For developers who do the same silly mistake

Correct code is;

 List<String> out = new ArrayList<String>(); //remove this line
                        @Override
                        public Iterable<String> select(Integer arg0) {
 List<String> out = new ArrayList<String>(); //Move it here
                                if( arg0 % 2 == 0){
                                        out.add( "even" );
                                }
                                else{
                                        out.add("odd");
                                }
                               
                                return out;
                        }