[jira] [Created] (FLINK-16552) Cannot include Option fields in any Table join

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16552) Cannot include Option fields in any Table join

Shang Yuanchun (Jira)
Jason Sinn created FLINK-16552:
----------------------------------

             Summary: Cannot include Option fields in any Table join
                 Key: FLINK-16552
                 URL: https://issues.apache.org/jira/browse/FLINK-16552
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.7.2
            Reporter: Jason Sinn


The table API currently fails joins where one of the tables has an option type, even though it is not in the join condition. A reproducible test case:

 
{code:java}
object TestJoinWithOption {
  case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int])
  case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int])
  def main(args: Array[String]): Unit = {
    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(sEnv)

    val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1))))
    val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2))))

    val t1 = tEnv.fromDataStream(testStream1)
    val t2 = tEnv.fromDataStream(testStream2)

    val result = t1.join(t2, "joinKeyOne = joinKeyTwo")
    result.toAppendStream[Row].print()

    sEnv.execute()
  }
}
{code}
Result:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method.Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be used in a join operation because it does not implement a proper hashCode() method. at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153) at org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147) at org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:56) at org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:45) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112)
{code}
It seems as though this issue has been brought up before in the streams API here: https://issues.apache.org/jira/browse/FLINK-2673

Expected behaviour: Since the join condition does not contain the option, the resulting schema should look like this (Actually, this was created by result.printSchema)
{code:java}
root
 |-- joinKeyOne: String
 |-- otherFieldOne: Option[Integer]
 |-- joinKeyTwo: String
 |-- otherFieldTwo: Option[Integer] {code}
Actual behaviour: Runtime exception is thrown above.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)