[jira] [Created] (FLINK-21013) Blink planner does not ingest timestamp into StreamRecord

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-21013) Blink planner does not ingest timestamp into StreamRecord

Shang Yuanchun (Jira)
Timo Walther created FLINK-21013:
------------------------------------

             Summary: Blink planner does not ingest timestamp into StreamRecord
                 Key: FLINK-21013
                 URL: https://issues.apache.org/jira/browse/FLINK-21013
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner, Table SQL / Runtime
            Reporter: Timo Walther


Currently, the rowtime attribute is not put into the StreamRecord when leaving the Table API to DataStream API. The legacy planner supports this, but the timestamp is null when using the Blink planner.

{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
            EnvironmentSettings settings =
                    EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStream<Order> orderA =
                env.fromCollection(
                        Arrays.asList(
                                new Order(1L, "beer", 3),
                                new Order(1L, "diaper", 4),
                                new Order(3L, "rubber", 2)));

        DataStream<Order> orderB =
                orderA.assignTimestampsAndWatermarks(
                        new AssignerWithPunctuatedWatermarks<Order>() {
                            @Nullable
                            @Override
                            public Watermark checkAndGetNextWatermark(
                                    Order lastElement, long extractedTimestamp) {
                                return new Watermark(extractedTimestamp);
                            }

                            @Override
                            public long extractTimestamp(Order element, long recordTimestamp) {
                                return element.user;
                            }
                        });

        Table tableA = tEnv.fromDataStream(orderB, $("user").rowtime(), $("product"), $("amount"));

        // union the two tables
        Table result = tEnv.sqlQuery("SELECT * FROM " + tableA);

        tEnv.toAppendStream(result, Row.class)
                .process(
                        new ProcessFunction<Row, Row>() {
                            @Override
                            public void processElement(Row value, Context ctx, Collector<Row> out)
                                    throws Exception {
                                System.out.println("TIMESTAMP" + ctx.timestamp());
                            }
                        })
                .print();

        env.execute();
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)