Timo Walther created FLINK-21013:
------------------------------------ Summary: Blink planner does not ingest timestamp into StreamRecord Key: FLINK-21013 URL: https://issues.apache.org/jira/browse/FLINK-21013 Project: Flink Issue Type: Bug Components: Table SQL / Planner, Table SQL / Runtime Reporter: Timo Walther Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner. {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = orderA.assignTimestampsAndWatermarks( new AssignerWithPunctuatedWatermarks<Order>() { @Nullable @Override public Watermark checkAndGetNextWatermark( Order lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp); } @Override public long extractTimestamp(Order element, long recordTimestamp) { return element.user; } }); Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount")); // union the two tables Table result = tEnv.sqlQuery("SELECT * FROM " + tableA); tEnv.toAppendStream(result, Row.class) .process( new ProcessFunction<Row, Row>() { @Override public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception { System.out.println("TIMESTAMP" + ctx.timestamp()); } }) .print(); env.execute(); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |