[jira] [Created] (FLINK-11916) Join with a Temporal Table should throw exception for left join

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11916) Join with a Temporal Table should throw exception for left join

Shang Yuanchun (Jira)
Hequn Cheng created FLINK-11916:
-----------------------------------

             Summary: Join with a Temporal Table should throw exception for left join
                 Key: FLINK-11916
                 URL: https://issues.apache.org/jira/browse/FLINK-11916
             Project: Flink
          Issue Type: Bug
          Components: API / Table SQL
            Reporter: Hequn Cheng


InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner join to left join the test works fine. We may need to throw an exception if we only support inner join.

CC [~pnowojski]

The problem can be reproduced with the following sql:
{code:java}
@Test
  def testEventTimeInnerJoin(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)
    env.setStateBackend(getStateBackend)
    StreamITCase.clear
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val sqlQuery =
      """
        |SELECT
        |  o.amount * r.rate AS amount
        |FROM
        |  Orders AS o left join
        |  LATERAL TABLE (Rates(o.rowtime)) AS r on true
        |WHERE r.currency = o.currency
        |""".stripMargin


    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
    ordersData.+=((2L, "Euro", new Timestamp(2L)))
    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
    ordersData.+=((50L, "Yen", new Timestamp(4L)))
    ordersData.+=((3L, "Euro", new Timestamp(5L)))

    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))

    var expectedOutput = new mutable.HashSet[String]()
    expectedOutput += (2 * 114).toString
    expectedOutput += (3 * 116).toString

    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
    val ratesHistory = env
      .fromCollection(ratesHistoryData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)

    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("RatesHistory", ratesHistory)
    tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L))
    tEnv.registerFunction(
      "Rates",
      tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency))
    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))

    // Scan from registered table to test for interplay between
    // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])
    env.execute()

    assertEquals(expectedOutput, StreamITCase.testResults.toSet)
  }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)