Dian Fu created FLINK-22082:
------------------------------- Summary: Nested projection push down doesn't work for data: row(array(row)) Key: FLINK-22082 URL: https://issues.apache.org/jira/browse/FLINK-22082 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.0, 1.13.0 Reporter: Dian Fu For the following job: {code} from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, StreamTableEnvironment config = TableConfig() env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env, config) source_ddl = """ CREATE TABLE InTable ( `ID` STRING, `Timestamp` TIMESTAMP(3), `Result` ROW( `data` ROW(`value` BIGINT) ARRAY), WATERMARK FOR `Timestamp` AS `Timestamp` ) WITH ( 'connector' = 'filesystem', 'format' = 'json', 'path' = '/tmp/1.txt' ) """ sink_ddl = """ CREATE TABLE OutTable ( `ID` STRING, `value` BIGINT ) WITH ( 'connector' = 'print' ) """ t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) table = t_env.from_path('InTable') table \ .select( table.ID, table.Result.data.at(1).value) \ .execute_insert('OutTable') \ .wait() {code} It will thrown the following exception: {code} : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) {code} See https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array for more details -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |