flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

taochanglian



您好,请教一个问题,谢谢:
很简单的json,
{"num":100,"ts":1595949526874,"vin":"DDDD"}
{"num":200,"ts":1595949528874,"vin":"AAAA"}
{"num":200,"ts":1595949530880,"vin":"CCCC"}
{"num":300,"ts":1595949532883,"vin":"CCCC"}
{"num":100,"ts":1595949534888,"vin":"AAAA"}
{"num":300,"ts":1595949536892,"vin":"DDDD"}
我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
public class FlinkKafka {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
" ts BIGINT,\n" +
" num INT ,\n" +
" vin STRING ,\n" +
" pts AS PROCTIME() ,  \n" +  //处理时间
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " +
"  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kkb',\n" +
" 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
" 'properties.group.id' = 'mm',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset' \n" +
")";
        tableEnv.executeSql(kafkaSourceTable);

        String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

        windowAllTable.printSchema();
        tableEnv.toAppendStream(windowAllTable, Row.class).print();
        System.out.println("------------------------------------------------------");
        env.execute("job");

    }

}


---------------------------
请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
打印结果:
root
 |-- ts: BIGINT
 |-- num: INT
 |-- vin: STRING
 |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
 |-- rowtime: TIMESTAMP(3) *ROWTIME*


------------------------------------------------------
11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43


但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
版本是flink1.11.0


望指教,谢谢!








 
Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

Benchao Li-2
这种情况一般是因为你的watermark生成可能有些问题,你可以在web ui上检查下watermark是否是正常的。

111 <[hidden email]> 于2020年7月29日周三 上午9:58写道:

>
>
>
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":"DDDD"}
> {"num":200,"ts":1595949528874,"vin":"AAAA"}
> {"num":200,"ts":1595949530880,"vin":"CCCC"}
> {"num":300,"ts":1595949532883,"vin":"CCCC"}
> {"num":100,"ts":1595949534888,"vin":"AAAA"}
> {"num":300,"ts":1595949536892,"vin":"DDDD"}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>
>         String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd
> HH:mm:ss')), \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' =
> 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
>         tableEnv.executeSql(kafkaSourceTable);
>
>         String queryWindowAllDataSql = "SELECT * from kafkaSourceTable
> group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
>
>         windowAllTable.printSchema();
>         tableEnv.toAppendStream(windowAllTable, Row.class).print();
>
> System.out.println("------------------------------------------------------");
>         env.execute("job");
>
>     }
>
> }
>
>
> ---------------------------
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable
> group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
>  |-- ts: BIGINT
>  |-- num: INT
>  |-- vin: STRING
>  |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
>
> ------------------------------------------------------
> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
>
>
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5'
> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
>
>
> 望指教,谢谢!
>
>
>
>
>
>
>
>
>



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复:flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

何会远
你这个不需要设置timecharateristic吗




| |
何会远
|
|
邮箱:[hidden email]
|

签名由 网易邮箱大师 定制

在2020年07月29日 11:16,Benchao Li 写道:
这种情况一般是因为你的watermark生成可能有些问题,你可以在web ui上检查下watermark是否是正常的。

111 <[hidden email]> 于2020年7月29日周三 上午9:58写道:

>
>
>
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":"DDDD"}
> {"num":200,"ts":1595949528874,"vin":"AAAA"}
> {"num":200,"ts":1595949530880,"vin":"CCCC"}
> {"num":300,"ts":1595949532883,"vin":"CCCC"}
> {"num":100,"ts":1595949534888,"vin":"AAAA"}
> {"num":300,"ts":1595949536892,"vin":"DDDD"}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> settings);
>
>         String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd
> HH:mm:ss')), \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' =
> 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
>         tableEnv.executeSql(kafkaSourceTable);
>
>         String queryWindowAllDataSql = "SELECT * from kafkaSourceTable
> group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
>
>         windowAllTable.printSchema();
>         tableEnv.toAppendStream(windowAllTable, Row.class).print();
>
> System.out.println("------------------------------------------------------");
>         env.execute("job");
>
>     }
>
> }
>
>
> ---------------------------
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable
> group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
>  |-- ts: BIGINT
>  |-- num: INT
>  |-- vin: STRING
>  |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
>
> ------------------------------------------------------
> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
>
>
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5'
> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
>
>
> 望指教,谢谢!
>
>
>
>
>
>
>
>
>



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

回复:flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

hechao
In reply to this post by taochanglian
我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置

第一:

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);




第二:

SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) {

    @Override

    public long extractTimestamp(Object o) {

        return o.timestamp;

    }

}).setParallelism(sinkParallelism);




第三:

tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt);


最后使用rt即可。


有什么不对的地方,请帮忙指出,谢谢。


| |
hechao
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年07月29日 09:57,111<[hidden email]> 写道:



您好,请教一个问题,谢谢:
很简单的json,
{"num":100,"ts":1595949526874,"vin":"DDDD"}
{"num":200,"ts":1595949528874,"vin":"AAAA"}
{"num":200,"ts":1595949530880,"vin":"CCCC"}
{"num":300,"ts":1595949532883,"vin":"CCCC"}
{"num":100,"ts":1595949534888,"vin":"AAAA"}
{"num":300,"ts":1595949536892,"vin":"DDDD"}
我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
public class FlinkKafka {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
" ts BIGINT,\n" +
" num INT ,\n" +
" vin STRING ,\n" +
" pts AS PROCTIME() ,  \n" +  //处理时间
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " +
"  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kkb',\n" +
" 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
" 'properties.group.id' = 'mm',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset' \n" +
")";
tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema();
tableEnv.toAppendStream(windowAllTable, Row.class).print();
System.out.println("------------------------------------------------------");
env.execute("job");

}

}


---------------------------
请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
打印结果:
root
|-- ts: BIGINT
|-- num: INT
|-- vin: STRING
|-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
|-- rowtime: TIMESTAMP(3) *ROWTIME*


------------------------------------------------------
11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43


但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
版本是flink1.11.0


望指教,谢谢!








Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

Leonard Xu
Hi, taochabglian

The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here.
Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details.

Best
Leonard Xu
[1] https://flink.apache.org/community.html#mailing-lists <https://flink.apache.org/community.html#mailing-lists>



> 在 2020年7月29日,13:55,hechao <[hidden email]> 写道:
>
> 我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置
>
> 第一:
>
> environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>
>
> 第二:
>
> SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) {
>
>    @Override
>
>    public long extractTimestamp(Object o) {
>
>        return o.timestamp;
>
>    }
>
> }).setParallelism(sinkParallelism);
>
>
>
>
> 第三:
>
> tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt);
>
>
> 最后使用rt即可。
>
>
> 有什么不对的地方,请帮忙指出,谢谢。
>
>
> | |
> hechao
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年07月29日 09:57,111<[hidden email]> 写道:
>
>
>
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":"DDDD"}
> {"num":200,"ts":1595949528874,"vin":"AAAA"}
> {"num":200,"ts":1595949530880,"vin":"CCCC"}
> {"num":300,"ts":1595949532883,"vin":"CCCC"}
> {"num":100,"ts":1595949534888,"vin":"AAAA"}
> {"num":300,"ts":1595949536892,"vin":"DDDD"}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>
> String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
> tableEnv.executeSql(kafkaSourceTable);
>
> String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
>
> windowAllTable.printSchema();
> tableEnv.toAppendStream(windowAllTable, Row.class).print();
> System.out.println("------------------------------------------------------");
> env.execute("job");
>
> }
>
> }
>
>
> ---------------------------
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
> |-- ts: BIGINT
> |-- num: INT
> |-- vin: STRING
> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
>
> ------------------------------------------------------
> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
>
>
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
>
>
> 望指教,谢谢!
>
>
>
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

Leonard Xu
Sorry for the typo.

The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here.
Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details.


> 在 2020年7月29日,14:33,Leonard Xu <[hidden email]> 写道:
>
> Hi, taochabglian
>
> The [hidden email] <mailto:[hidden email]> is used to discuss Flink Development like new features, vote, and releases, it’s improper to discuss user question here.
> Please discuss user question in [hidden email] <mailto:[hidden email]>, if you prefer Chinese, please discuss question in [hidden email] <mailto:[hidden email]> , you can refer[1] for more details.
>
> Best
> Leonard Xu
> [1] https://flink.apache.org/community.html#mailing-lists <https://flink.apache.org/community.html#mailing-lists>
>
>
>
>> 在 2020年7月29日,13:55,hechao <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> 我们这都是大部分都是以使用eventtime进行处理居多,需要使用eventtime,则要在3个地方进行设置
>>
>> 第一:
>>
>> environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>>
>>
>> 第二:
>>
>> SingleOutputStreamOperator<Object> add_event_time = hitchSPVLoggerSingleOutputStreamOperator.uid("add event time").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Object>(org.apache.flink.streaming.api.windowing.time.Time.seconds(0)) {
>>
>>    @Override
>>
>>    public long extractTimestamp(Object o) {
>>
>>        return o.timestamp;
>>
>>    }
>>
>> }).setParallelism(sinkParallelism);
>>
>>
>>
>>
>> 第三:
>>
>> tableEnv.registerDataStream("hitch_match_result", add_event_time, rowtime.rowtime as rt);
>>
>>
>> 最后使用rt即可。
>>
>>
>> 有什么不对的地方,请帮忙指出,谢谢。
>>
>>
>> | |
>> hechao
>> |
>> |
>> [hidden email] <mailto:[hidden email]>
>> |
>> 签名由网易邮箱大师定制
>>
>>
>> 在2020年07月29日 09:57,111<[hidden email]> 写道:
>>
>>
>>
>> 您好,请教一个问题,谢谢:
>> 很简单的json,
>> {"num":100,"ts":1595949526874,"vin":"DDDD"}
>> {"num":200,"ts":1595949528874,"vin":"AAAA"}
>> {"num":200,"ts":1595949530880,"vin":"CCCC"}
>> {"num":300,"ts":1595949532883,"vin":"CCCC"}
>> {"num":100,"ts":1595949534888,"vin":"AAAA"}
>> {"num":300,"ts":1595949536892,"vin":"DDDD"}
>> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
>> public class FlinkKafka {
>> public static void main(String[] args) throws Exception{
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
>>
>> String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
>> " ts BIGINT,\n" +
>> " num INT ,\n" +
>> " vin STRING ,\n" +
>> " pts AS PROCTIME() ,  \n" +  //处理时间
>> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " +
>> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = 'kkb',\n" +
>> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
>> " 'properties.group.id' = 'mm',\n" +
>> " 'format' = 'json',\n" +
>> " 'scan.startup.mode' = 'latest-offset' \n" +
>> ")";
>> tableEnv.executeSql(kafkaSourceTable);
>>
>> String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
>> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
>>
>> windowAllTable.printSchema();
>> tableEnv.toAppendStream(windowAllTable, Row.class).print();
>> System.out.println("------------------------------------------------------");
>> env.execute("job");
>>
>> }
>>
>> }
>>
>>
>> ---------------------------
>> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
>> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
>> 打印结果:
>> root
>> |-- ts: BIGINT
>> |-- num: INT
>> |-- vin: STRING
>> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
>> |-- rowtime: TIMESTAMP(3) *ROWTIME*
>>
>>
>> ------------------------------------------------------
>> 11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29
>> 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27
>> 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31
>> 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33
>> 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37
>> 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35
>> 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39
>> 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
>>
>>
>> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
>> 版本是flink1.11.0
>>
>>
>> 望指教,谢谢!
>>
>>
>>
>>
>>
>>
>>
>>
>