Caizhi Weng created FLINK-22730:
----------------------------------- Summary: Lookup join condition with CURRENT_DATE fails to filter records Key: FLINK-22730 URL: https://issues.apache.org/jira/browse/FLINK-22730 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.13.0, 1.12.0 Reporter: Caizhi Weng Add the following test case to org.apache.flink.table.api.TableEnvironmentITCase to reproduce this bug. {code:scala} @Test def myTest(): Unit = { val id1 = TestValuesTableFactory.registerData( Seq(Row.of("abc", LocalDateTime.of(2000, 1, 1, 0, 0)))) val ddl1 = s""" |CREATE TABLE Ta ( | id VARCHAR, | ts TIMESTAMP, | proc AS PROCTIME() |) WITH ( | 'connector' = 'values', | 'data-id' = '$id1', | 'bounded' = 'true' |) |""".stripMargin tEnv.executeSql(ddl1) val id2 = TestValuesTableFactory.registerData( Seq(Row.of("abc", LocalDateTime.of(2000, 1, 2, 0, 0)))) val ddl2 = s""" |CREATE TABLE Tb ( | id VARCHAR, | ts TIMESTAMP |) WITH ( | 'connector' = 'values', | 'data-id' = '$id2', | 'bounded' = 'true' |) |""".stripMargin tEnv.executeSql(ddl2) val it = tEnv.executeSql( """ |SELECT * FROM Ta AS t1 |INNER JOIN Tb FOR SYSTEM_TIME AS OF t1.proc AS t2 |ON t1.id = t2.id |WHERE CAST(coalesce(t1.ts, t2.ts) AS VARCHAR) >= CONCAT(CAST(CURRENT_DATE AS VARCHAR), ' 00:00:00') |""".stripMargin).collect() while (it.hasNext) { System.out.println(it.next()) } } {code} The result is {code} +I[abc, 2000-01-01T00:00, 2021-05-20T14:30:47.735Z, abc, 2000-01-02T00:00] {code} which is obviously incorrect. The generated operator is as follows {code:java} public class JoinTableFuncCollector$22 extends org.apache.flink.table.runtime.collector.TableFunctionCollector { org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2); org.apache.flink.table.data.utils.JoinedRowData joinedRow$9 = new org.apache.flink.table.data.utils.JoinedRowData(); private static final java.util.TimeZone timeZone = java.util.TimeZone.getTimeZone("Asia/Shanghai"); private org.apache.flink.table.data.TimestampData timestamp; private org.apache.flink.table.data.TimestampData localTimestamp; private int date; private final org.apache.flink.table.data.binary.BinaryStringData str$17 = org.apache.flink.table.data.binary.BinaryStringData.fromString(" 00:00:00"); public JoinTableFuncCollector$22(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public void collect(Object record) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput(); org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record; org.apache.flink.table.data.binary.BinaryStringData field$7; boolean isNull$7; org.apache.flink.table.data.TimestampData field$8; boolean isNull$8; org.apache.flink.table.data.TimestampData field$10; boolean isNull$10; boolean isNull$13; org.apache.flink.table.data.binary.BinaryStringData result$14; boolean isNull$15; org.apache.flink.table.data.binary.BinaryStringData result$16; boolean isNull$18; org.apache.flink.table.data.binary.BinaryStringData result$19; boolean isNull$20; boolean result$21; isNull$8 = in2.isNullAt(1); field$8 = null; if (!isNull$8) { field$8 = in2.getTimestamp(1, 6); } isNull$7 = in2.isNullAt(0); field$7 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$7) { field$7 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(0)); } isNull$10 = in1.isNullAt(1); field$10 = null; if (!isNull$10) { field$10 = in1.getTimestamp(1, 6); } boolean result$11 = !isNull$10; org.apache.flink.table.data.TimestampData result$12 = null; boolean isNull$12; if (result$11) { isNull$12 = isNull$10; if (!isNull$12) { result$12 = field$10; } } else { isNull$12 = isNull$8; if (!isNull$12) { result$12 = field$8; } } isNull$13 = isNull$12; result$14 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$13) { result$14 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.flink.table.runtime.functions.SqlDateTimeUtils.timestampToString(result$12, 6)); isNull$13 = (result$14 == null); } isNull$15 = false; result$16 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; if (!isNull$15) { result$16 = org.apache.flink.table.data.binary.BinaryStringData.fromString(org.apache.calcite.avatica.util.DateTimeUtils.unixDateToString(((int) date))); isNull$15 = (result$16 == null); } result$19 = org.apache.flink.table.data.binary.BinaryStringDataUtil.concat(( isNull$15 ) ? null : (result$16), ( false ) ? null : (((org.apache.flink.table.data.binary.BinaryStringData) str$17))); isNull$18 = (result$19 == null); if (isNull$18) { result$19 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; } isNull$20 = isNull$13 || isNull$18; result$21 = false; if (!isNull$20) { result$21 = ((result$14 == null) ? ((result$19 == null) ? 0 : -1) : ((result$19 == null) ? 1 : (result$14.compareTo(result$19)))) >= 0; } if (result$21) { if (isNull$7) { out.setField(0, null); } else { out.setField(0, field$7); } if (isNull$8) { out.setField(1, null); } else { out.setField(1, field$8); } joinedRow$9.replace(in1, out); joinedRow$9.setRowKind(in1.getRowKind()); outputResult(joinedRow$9); } } @Override public void close() throws Exception { } } {code} The member variable {{date}} is not initialized before use, thus causing this bug. This is because {{LookupJoinCodeGenerator#generateTableFunctionCollectorForJoinTable}} forgets to use {{${ctx.reusePerRecordCode()}}} when generating {{collect}} method. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |