I use flink stream sql to write a demo about "group by". The records
are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and sum the second element. Every time I run the program, the result is different. It seems that the records are out of order. Even sometimes record is lost. I am confused about that. The code is as below: public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); DataStream<Tuple2<String, Long>> dataStream = env.fromElements( Tuple2.of("bj", 1L), Tuple2.of("bj", 3L), Tuple2.of("bj", 5L)); tEnv.registerDataStream("person", dataStream); String sql = "select f0, sum(f1) from person group by f0"; Table table = tEnv.sqlQuery(sql); tEnv.toRetractStream(table, Row.class).print(); env.execute(); } } The results may be as below: 1> (true,bj,1) 1> (false,bj,1) 1> (true,bj,4) 1> (false,bj,4) 1> (true,bj,9) 1> (true,bj,5) 1> (false,bj,5) 1> (true,bj,8) 1> (false,bj,8) 1> (true,bj,9) |
Hi,
A GROUP BY query on a streaming table requires that the result is continuously updated. Updates are propagated as a retraction stream (see tEnv.toRetractStream(table, Row.class).print(); in your code). A retraction stream encodes the type of the update as a boolean flag, the "true" and "false" values in your result. "true" means the record was added to the result, "false" means the record is removed from the result. If you follow the output, it is the same in both cases: (bj, 9). The different "result paths" result from the parallel (multi-threaded) processing of the query. If you set the parallelism to 1 ( env.setParallelism(1);) the "result path" should be the same every time. Best, Fabian Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 <[hidden email]>: > I use flink stream sql to write a demo about "group by". The > records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and > sum the second element. > Every time I run the program, the result is different. It seems that > the records are out of order. Even sometimes record is lost. I am confused > about that. > The code is as below: > > public class Test { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); > > DataStream<Tuple2<String, Long>> dataStream = env.fromElements( > Tuple2.of("bj", 1L), > Tuple2.of("bj", 3L), > Tuple2.of("bj", 5L)); > tEnv.registerDataStream("person", dataStream); > > String sql = "select f0, sum(f1) from person group by f0"; > Table table = tEnv.sqlQuery(sql); > tEnv.toRetractStream(table, Row.class).print(); > > env.execute(); > } > } > > The results may be as below: > 1> (true,bj,1) > 1> (false,bj,1) > 1> (true,bj,4) > 1> (false,bj,4) > 1> (true,bj,9) > > 1> (true,bj,5) > 1> (false,bj,5) > 1> (true,bj,8) > 1> (false,bj,8) > 1> (true,bj,9) > |
Free forum by Nabble | Edit this page |