Mulan created FLINK-18889:
----------------------------- Summary: New Async Table Function type inference fails Key: FLINK-18889 URL: https://issues.apache.org/jira/browse/FLINK-18889 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.1 Reporter: Mulan {code:java} @FunctionHint( input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW<ct STRING>") ) public class RedisAsyncTableFunction extends AsyncTableFunction<Row> { private RedisClient redisClient; private StatefulRedisConnection<String, String> connection; private RedisKeyAsyncCommands<String, String> async; private static final String PREFIX = "redis://"; private static final String DEFAULT_DB = "0"; private static final String DEFAULT_URL = "localhost:6379"; private static final String DEFAULT_PASSWORD = ""; @Override public void open(FunctionContext context) throws Exception { final String url = DEFAULT_URL; final String password = DEFAULT_PASSWORD; final String database = DEFAULT_DB; StringBuilder redisUri = new StringBuilder(); redisUri.append(PREFIX).append(password).append(url).append("/").append(database); redisClient = RedisClient.create(redisUri.toString()); connection = redisClient.connect(); async = connection.async(); } public void eval(CompletableFuture<Collection<Row>> outputFuture, String key) { RedisFuture<Map<String, String>> redisFuture = ((RedisHashAsyncCommands) async).hgetall(key); redisFuture.thenAccept(new Consumer<Map<String, String>>() { @Override public void accept(Map<String, String> values) { int len = 1; Row row = new Row(len); row.setField(0, values.get("ct")); outputFuture.complete(Collections.singletonList(row)); } }); } @Override public void close() throws Exception { if (connection != null){ connection.close(); } if (redisClient != null){ redisClient.shutdown(); } } } {code} {code:java} tEnv.createTemporarySystemFunction("lookup_redis", RedisAsyncTableFunction.class); {code} {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 31 to line 3, column 48: No match found for function signature lookup_redis(<CHARACTER>) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at hua.mulan.slink.SqlSubmit.callInsertInto(SqlSubmit.java:100) at hua.mulan.slink.SqlSubmit.callCommand(SqlSubmit.java:75) at hua.mulan.slink.SqlSubmit.run(SqlSubmit.java:57) at hua.mulan.slink.SqlSubmit.main(SqlSubmit.java:38) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 31 to line 3, column 48: No match found for function signature lookup_redis(<CHARACTER>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 10 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature lookup_redis(<CHARACTER>) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 38 more {code} Is similar issue? https://issues.apache.org/jira/browse/FLINK-18520 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |