Hequn Cheng created FLINK-11916:
----------------------------------- Summary: Join with a Temporal Table should throw exception for left join Key: FLINK-11916 URL: https://issues.apache.org/jira/browse/FLINK-11916 Project: Flink Issue Type: Bug Components: API / Table SQL Reporter: Hequn Cheng InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner join to left join the test works fine. We may need to throw an exception if we only support inner join. CC [~pnowojski] The problem can be reproduced with the following sql: {code:java} @Test def testEventTimeInnerJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) env.setStateBackend(getStateBackend) StreamITCase.clear env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ |SELECT | o.amount * r.rate AS amount |FROM | Orders AS o left join | LATERAL TABLE (Rates(o.rowtime)) AS r on true |WHERE r.currency = o.currency |""".stripMargin val ordersData = new mutable.MutableList[(Long, String, Timestamp)] ordersData.+=((2L, "Euro", new Timestamp(2L))) ordersData.+=((1L, "US Dollar", new Timestamp(3L))) ordersData.+=((50L, "Yen", new Timestamp(4L))) ordersData.+=((3L, "Euro", new Timestamp(5L))) val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) var expectedOutput = new mutable.HashSet[String]() expectedOutput += (2 * 114).toString expectedOutput += (3 * 116).toString val orders = env .fromCollection(ordersData) .assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]()) .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime) val ratesHistory = env .fromCollection(ratesHistoryData) .assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]()) .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) // Scan from registered table to test for interplay between // LogicalCorrelateToTemporalTableJoinRule and TableScanRule val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() assertEquals(expectedOutput, StreamITCase.testResults.toSet) } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |