Wu created FLINK-20637:
-------------------------- Summary: Table convert to dataStream twice will result in two data streams Key: FLINK-20637 URL: https://issues.apache.org/jira/browse/FLINK-20637 Project: Flink Issue Type: Improvement Components: API / DataStream, Connectors / Kafka, Table SQL / API, Table SQL / Planner Affects Versions: 1.11.2 Reporter: Wu Code {code:java} //代码占位符 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(50000); env.setParallelism(10); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("create table feeds_expose_click_profile ( docId string ,buuid string ,predictId string ,docType int ,clickLabel int ,viewTime int ,exposeEventTime bigint ,clickEventTime string ,authorId string ,category string ,subCategory string ,keywords string ,tags string, eventTime bigint, rowTime as TO_TIMESTAMP(from_unixtime(eventTime / 1000)), WATERMARK FOR rowTime AS rowTime - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'false' )"); Table table = tableEnv.from("feeds_expose_click_profile"); TypeInformation<Row> typeInfo = table.getSchema().toRowType(); DataStream dataStream = tableEnv .toRetractStream(table, typeInfo) .filter(row -> row.f0) .map(row -> row.f1) .returns(typeInfo); Table tableFilter = tableEnv.sqlQuery("select buuid, authorId, viewTime, rowTime from feeds_expose_click_profile"); tableEnv.createTemporaryView("tableFilter", tableFilter); TypeInformation<Row> typeInfo1 = tableFilter.getSchema().toRowType(); DataStream dataStream1 = tableEnv .toRetractStream(tableFilter, typeInfo1) .filter(row -> row.f0) .map(row -> row.f1) .returns(typeInfo1); dataStream1.print(); System.out.println(env.getExecutionPlan()); {code} ExecutionPlan {code:java} //代码占位符 { "nodes" : [ { "id" : 1, "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])", "pact" : "Data Source", "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])", "parallelism" : 10 }, { "id" : 2, "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])", "pact" : "Operator", "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])", "parallelism" : 10, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])", "pact" : "Operator", "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])", "parallelism" : 10, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 4, "type" : "SinkConversionToTuple2", "pact" : "Operator", "contents" : "SinkConversionToTuple2", "parallelism" : 10, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "Filter", "pact" : "Operator", "contents" : "Filter", "parallelism" : 10, "predecessors" : [ { "id" : 4, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 6, "type" : "Map", "pact" : "Operator", "contents" : "Map", "parallelism" : 10, "predecessors" : [ { "id" : 5, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 7, "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])", "pact" : "Data Source", "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, feeds_expose_click_profile]], fields=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime])", "parallelism" : 10 }, { "id" : 8, "type" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])", "pact" : "Operator", "contents" : "Calc(select=[docId, buuid, predictId, docType, clickLabel, viewTime, exposeEventTime, clickEventTime, authorId, category, subCategory, keywords, tags, eventTime, TO_TIMESTAMP(FROM_UNIXTIME((eventTime / 1000))) AS rowTime])", "parallelism" : 10, "predecessors" : [ { "id" : 7, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 9, "type" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])", "pact" : "Operator", "contents" : "WatermarkAssigner(rowtime=[rowTime], watermark=[(rowTime - 5000:INTERVAL SECOND)])", "parallelism" : 10, "predecessors" : [ { "id" : 8, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 10, "type" : "Calc(select=[buuid, authorId, viewTime, rowTime])", "pact" : "Operator", "contents" : "Calc(select=[buuid, authorId, viewTime, rowTime])", "parallelism" : 10, "predecessors" : [ { "id" : 9, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 11, "type" : "SinkConversionToTuple2", "pact" : "Operator", "contents" : "SinkConversionToTuple2", "parallelism" : 10, "predecessors" : [ { "id" : 10, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 12, "type" : "Filter", "pact" : "Operator", "contents" : "Filter", "parallelism" : 10, "predecessors" : [ { "id" : 11, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 13, "type" : "Map", "pact" : "Operator", "contents" : "Map", "parallelism" : 10, "predecessors" : [ { "id" : 12, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 14, "type" : "Sink: Print to Std. Out", "pact" : "Data Sink", "contents" : "Sink: Print to Std. Out", "parallelism" : 10, "predecessors" : [ { "id" : 13, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } {code} I encountered this problem while using waterdrop. How to fix this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |