hehuiyuan created FLINK-18145:
--------------------------------- Summary: Segment optimization does not work ? Key: FLINK-18145 URL: https://issues.apache.org/jira/browse/FLINK-18145 Project: Flink Issue Type: Wish Reporter: hehuiyuan DAG Segement Optimization: !image-2020-06-05-14-40-03-123.png|width=569,height=226! Code: {code:java} StreamExecutionEnvironment env = EnvUtil.getEnv(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0); tableEnv.registerTableSource("myTble",tableSource); Table mytable = tableEnv.scan("myTble"); mytable.printSchema(); tableEnv.toAppendStream(mytable,Row.class).addSink(new PrintSinkFunction<>()).setParallelism(2); Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key"); tableproc.printSchema(); tableEnv.registerTable("t4",tableproc); Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as countkey,TUMBLE_START(proctime, INTERVAL '24' HOUR) as tumblestart FROM myTble group by TUMBLE(proctime, INTERVAL '24' HOUR) ,key"); table.printSchema(); tableEnv.registerTable("t3",table); String[] fields = new String[]{"key","countkey","tumblestart"}; TypeInformation[] fieldsType = new TypeInformation[3]; fieldsType[0] = Types.INT; fieldsType[1] = Types.LONG; fieldsType[2] = Types.SQL_TIMESTAMP; PrintTableUpsertSink printTableSink = new PrintTableUpsertSink(fields,fieldsType,true); tableEnv.registerTableSink("inserttable",printTableSink); tableEnv.sqlUpdate("insert into inserttable select key,countkey,tumblestart from t3"); String[] fieldsproc = new String[]{"key","countkey","tumblestart"}; TypeInformation[] fieldsTypeproc = new TypeInformation[3]; fieldsTypeproc[0] = Types.INT; fieldsTypeproc[1] = Types.LONG; fieldsTypeproc[2] = Types.SQL_TIMESTAMP; PrintTableUpsertSink printTableSinkproc = new PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true); tableEnv.registerTableSink("inserttableproc",printTableSinkproc); tableEnv.sqlUpdate("insert into inserttableproc select key,countkey,tumblestart from t4"); {code} I have a custom table source , then (1) transform datastream to use `toAppendStream` method , then sink (2) use tumble ,then sink (3) use another tumbel ,then sink but the segement optimization did't work. *!image-2020-06-05-14-50-33-759.png|width=458,height=336!* *The source is executed by 3 threads and generate duplicate data for 3 times* !image-2020-06-05-14-53-57-056.png|width=1216,height=204! -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |