Jason Sinn created FLINK-16552:
---------------------------------- Summary: Cannot include Option fields in any Table join Key: FLINK-16552 URL: https://issues.apache.org/jira/browse/FLINK-16552 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.7.2 Reporter: Jason Sinn The table API currently fails joins where one of the tables has an option type, even though it is not in the join condition. A reproducible test case: {code:java} object TestJoinWithOption { case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int]) case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int]) def main(args: Array[String]): Unit = { val sEnv = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(sEnv) val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1)))) val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2)))) val t1 = tEnv.fromDataStream(testStream1) val t2 = tEnv.fromDataStream(testStream2) val result = t1.join(t2, "joinKeyOne = joinKeyTwo") result.toAppendStream[Row].print() sEnv.execute() } } {code} Result: {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method. at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153) at org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147) at org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:56) at org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:45) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112) {code} It seems as though this issue has been brought up before in the streams API here: https://issues.apache.org/jira/browse/FLINK-2673 Expected behaviour: Since the join condition does not contain the option, the resulting schema should look like this (Actually, this was created by result.printSchema) {code:java} root |-- joinKeyOne: String |-- otherFieldOne: Option[Integer] |-- joinKeyTwo: String |-- otherFieldTwo: Option[Integer] {code} Actual behaviour: Runtime exception is thrown above. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |