您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable); String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job"); } } --------------------------- 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* |-- rowtime: TIMESTAMP(3) *ROWTIME* ------------------------------------------------------ 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0 望指教,谢谢! |
这种情况一般是因为你的watermark生成可能有些问题,你可以在web ui上检查下watermark是否是正常的。
111 <[hidden email]> 于2020年7月29日周三 上午9:58写道: > > > > 您好,请教一个问题,谢谢: > 很简单的json, > {"num":100,"ts":1595949526874,"vin":"DDDD"} > {"num":200,"ts":1595949528874,"vin":"AAAA"} > {"num":200,"ts":1595949530880,"vin":"CCCC"} > {"num":300,"ts":1595949532883,"vin":"CCCC"} > {"num":100,"ts":1595949534888,"vin":"AAAA"} > {"num":300,"ts":1595949536892,"vin":"DDDD"} > 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 > public class FlinkKafka { > public static void main(String[] args) throws Exception{ > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > > String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + > " ts BIGINT,\n" + > " num INT ,\n" + > " vin STRING ,\n" + > " pts AS PROCTIME() , \n" + //处理时间 > " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd > HH:mm:ss')), \n " + > " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'kkb',\n" + > " 'properties.bootstrap.servers' = > 'node01:9092,node02:9092,node03:9092',\n" + > " 'properties.group.id' = 'mm',\n" + > " 'format' = 'json',\n" + > " 'scan.startup.mode' = 'latest-offset' \n" + > ")"; > tableEnv.executeSql(kafkaSourceTable); > > String queryWindowAllDataSql = "SELECT * from kafkaSourceTable > group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; > final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); > > windowAllTable.printSchema(); > tableEnv.toAppendStream(windowAllTable, Row.class).print(); > > System.out.println("------------------------------------------------------"); > env.execute("job"); > > } > > } > > > --------------------------- > 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable > group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" > 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 > 打印结果: > root > |-- ts: BIGINT > |-- num: INT > |-- vin: STRING > |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* > |-- rowtime: TIMESTAMP(3) *ROWTIME* > > > ------------------------------------------------------ > 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 > 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 > 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 > 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 > 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 > 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 > 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 > 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 > > > 但是如果我使用TUMBLE(rowtime, INTERVAL '5' > SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 > 版本是flink1.11.0 > > > 望指教,谢谢! > > > > > > > > > -- Best, Benchao Li |
你这个不需要设置timecharateristic吗
| | 何会远 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年07月29日 11:16,Benchao Li 写道: 这种情况一般是因为你的watermark生成可能有些问题,你可以在web ui上检查下watermark是否是正常的。 111 <[hidden email]> 于2020年7月29日周三 上午9:58写道: > > > > 您好,请教一个问题,谢谢: > 很简单的json, > {"num":100,"ts":1595949526874,"vin":"DDDD"} > {"num":200,"ts":1595949528874,"vin":"AAAA"} > {"num":200,"ts":1595949530880,"vin":"CCCC"} > {"num":300,"ts":1595949532883,"vin":"CCCC"} > {"num":100,"ts":1595949534888,"vin":"AAAA"} > {"num":300,"ts":1595949536892,"vin":"DDDD"} > 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 > public class FlinkKafka { > public static void main(String[] args) throws Exception{ > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > > String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + > " ts BIGINT,\n" + > " num INT ,\n" + > " vin STRING ,\n" + > " pts AS PROCTIME() , \n" + //处理时间 > " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd > HH:mm:ss')), \n " + > " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'kkb',\n" + > " 'properties.bootstrap.servers' = > 'node01:9092,node02:9092,node03:9092',\n" + > " 'properties.group.id' = 'mm',\n" + > " 'format' = 'json',\n" + > " 'scan.startup.mode' = 'latest-offset' \n" + > ")"; > tableEnv.executeSql(kafkaSourceTable); > > String queryWindowAllDataSql = "SELECT * from kafkaSourceTable > group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; > final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); > > windowAllTable.printSchema(); > tableEnv.toAppendStream(windowAllTable, Row.class).print(); > > System.out.println("------------------------------------------------------"); > env.execute("job"); > > } > > } > > > --------------------------- > 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable > group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" > 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 > 打印结果: > root > |-- ts: BIGINT > |-- num: INT > |-- vin: STRING > |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* > |-- rowtime: TIMESTAMP(3) *ROWTIME* > > > ------------------------------------------------------ > 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 > 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 > 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 > 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 > 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 > 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 > 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 > 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 > > > 但是如果我使用TUMBLE(rowtime, INTERVAL '5' > SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 > 版本是flink1.11.0 > > > 望指教,谢谢! > > > > > > > > > -- Best, Benchao Li |
In reply to this post by taochanglian
我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置
第一: environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 第二: SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) { @Override public long extractTimestamp(Object o) { return o.timestamp; } }).setParallelism(sinkParallelism); 第三: tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt); 最后使用rt即可。 有什么不对的地方,请帮忙指出,谢谢。 | | hechao | | [hidden email] | 签名由网易邮箱大师定制 在2020年07月29日 09:57,111<[hidden email]> 写道: 您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable); String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job"); } } --------------------------- 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* |-- rowtime: TIMESTAMP(3) *ROWTIME* ------------------------------------------------------ 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0 望指教,谢谢! |
Hi, taochabglian
The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here. Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details. Best Leonard Xu [1] https://flink.apache.org/community.html#mailing-lists <https://flink.apache.org/community.html#mailing-lists> > 在 2020年7月29日,13:55,hechao <[hidden email]> 写道: > > 我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置 > > 第一: > > environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > 第二: > > SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) { > > @Override > > public long extractTimestamp(Object o) { > > return o.timestamp; > > } > > }).setParallelism(sinkParallelism); > > > > > 第三: > > tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt); > > > 最后使用rt即可。 > > > 有什么不对的地方,请帮忙指出,谢谢。 > > > | | > hechao > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年07月29日 09:57,111<[hidden email]> 写道: > > > > 您好,请教一个问题,谢谢: > 很简单的json, > {"num":100,"ts":1595949526874,"vin":"DDDD"} > {"num":200,"ts":1595949528874,"vin":"AAAA"} > {"num":200,"ts":1595949530880,"vin":"CCCC"} > {"num":300,"ts":1595949532883,"vin":"CCCC"} > {"num":100,"ts":1595949534888,"vin":"AAAA"} > {"num":300,"ts":1595949536892,"vin":"DDDD"} > 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 > public class FlinkKafka { > public static void main(String[] args) throws Exception{ > final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); > > String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + > " ts BIGINT,\n" + > " num INT ,\n" + > " vin STRING ,\n" + > " pts AS PROCTIME() , \n" + //处理时间 > " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + > " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'kkb',\n" + > " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + > " 'properties.group.id' = 'mm',\n" + > " 'format' = 'json',\n" + > " 'scan.startup.mode' = 'latest-offset' \n" + > ")"; > tableEnv.executeSql(kafkaSourceTable); > > String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; > final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); > > windowAllTable.printSchema(); > tableEnv.toAppendStream(windowAllTable, Row.class).print(); > System.out.println("------------------------------------------------------"); > env.execute("job"); > > } > > } > > > --------------------------- > 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" > 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 > 打印结果: > root > |-- ts: BIGINT > |-- num: INT > |-- vin: STRING > |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* > |-- rowtime: TIMESTAMP(3) *ROWTIME* > > > ------------------------------------------------------ > 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 > 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 > 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 > 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 > 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 > 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 > 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 > 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 > > > 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 > 版本是flink1.11.0 > > > 望指教,谢谢! > > > > > > > > |
Sorry for the typo.
The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here. Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details. > 在 2020年7月29日,14:33,Leonard Xu <[hidden email]> 写道: > > Hi, taochabglian > > The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here. > Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details. > > Best > Leonard Xu > [1] https://flink.apache.org/community.html#mailing-lists <https://flink.apache.org/community.html#mailing-lists> > > > >> 在 2020年7月29日,13:55,hechao <[hidden email] <mailto:[hidden email]>> 写道: >> >> 我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置 >> >> 第一: >> >> environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> >> >> 第二: >> >> SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) { >> >> @Override >> >> public long extractTimestamp(Object o) { >> >> return o.timestamp; >> >> } >> >> }).setParallelism(sinkParallelism); >> >> >> >> >> 第三: >> >> tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt); >> >> >> 最后使用rt即可。 >> >> >> 有什么不对的地方,请帮忙指出,谢谢。 >> >> >> | | >> hechao >> | >> | >> [hidden email] <mailto:[hidden email]> >> | >> 签名由网易邮箱大师定制 >> >> >> 在2020年07月29日 09:57,111<[hidden email]> 写道: >> >> >> >> 您好,请教一个问题,谢谢: >> 很简单的json, >> {"num":100,"ts":1595949526874,"vin":"DDDD"} >> {"num":200,"ts":1595949528874,"vin":"AAAA"} >> {"num":200,"ts":1595949530880,"vin":"CCCC"} >> {"num":300,"ts":1595949532883,"vin":"CCCC"} >> {"num":100,"ts":1595949534888,"vin":"AAAA"} >> {"num":300,"ts":1595949536892,"vin":"DDDD"} >> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 >> public class FlinkKafka { >> public static void main(String[] args) throws Exception{ >> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >> final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); >> >> String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + >> " ts BIGINT,\n" + >> " num INT ,\n" + >> " vin STRING ,\n" + >> " pts AS PROCTIME() , \n" + //处理时间 >> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + >> " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'topic' = 'kkb',\n" + >> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + >> " 'properties.group.id' = 'mm',\n" + >> " 'format' = 'json',\n" + >> " 'scan.startup.mode' = 'latest-offset' \n" + >> ")"; >> tableEnv.executeSql(kafkaSourceTable); >> >> String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; >> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql); >> >> windowAllTable.printSchema(); >> tableEnv.toAppendStream(windowAllTable, Row.class).print(); >> System.out.println("------------------------------------------------------"); >> env.execute("job"); >> >> } >> >> } >> >> >> --------------------------- >> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" >> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 >> 打印结果: >> root >> |-- ts: BIGINT >> |-- num: INT >> |-- vin: STRING >> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME* >> |-- rowtime: TIMESTAMP(3) *ROWTIME* >> >> >> ------------------------------------------------------ >> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 >> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 >> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 >> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 >> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 >> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 >> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 >> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43 >> >> >> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 >> 版本是flink1.11.0 >> >> >> 望指教,谢谢! >> >> >> >> >> >> >> >> > |
Free forum by Nabble | Edit this page |