zhengbm created FLINK-11862:
------------------------------- Summary: 在同一条流上进行多次不同的sql,第二个sql的where条件不可用 Key: FLINK-11862 URL: https://issues.apache.org/jira/browse/FLINK-11862 Project: Flink Issue Type: Bug Components: API / Table SQL Affects Versions: 1.7.2 Environment: flink 1.7版本 java 1.8 Reporter: zhengbm List<String> fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema(); for (int i = 0; i < fields.size(); i++) { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } tableEnvironment.connect(new Kafka() .version("0.8") .properties(properties) .topic("raw_playtime_h5_source") .startFromLatest() ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(schema) .inAppendMode() .registerTableSource("t1"); Table table2 = tableEnvironment .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as flash from t1 ,LATERAL TABLE(split(rawMessage,'\\t')) as T(maps) "); tableEnvironment.registerTable("t2", table2); Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 where maps_length>0"); TypeInformation typeInformation = table.getSchema().toRowType(); String[] columns = table.getSchema().getFieldNames(); DataStream<String> dataStream = tableEnvironment .toAppendStream(table, typeInformation) .map(new PhysicTransformMap(columns, 0)); dataStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } 注:kafka中的数据流格式如下\{"timestamp" : "xxxx","rawMessage":"xxx\txxx\txxxx\t"} Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173) -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |