Hi All
I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the SQL, and it raises the error like : Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) ... 7 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 26 more It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ? GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !! tableEnv.useCatalog(catalog.getName()); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSink("sinkstream");; String sql = "insert into ca1.db1.sinkstream " + "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " + "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); tableEnv.execute("test"); Thanks, SImon |
Hi Simon,
Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release. Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase(). As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database. Please try those and let me know if you find new problems. Thanks, Xuefu On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: > Hi All > I want to use a custom catalog by setting the name “ca1” and create a > database under this catalog. When I submit the > SQL, and it raises the error like : > > > Exception in thread "main" > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 1, column 98 to line 1, column 116: Object 'orderstream' not found > within 'ca1.db1' > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) > at > org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) > at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) > at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line > 1, column 98 to line 1, column 116: Object 'orderstream' not found within > 'ca1.db1' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) > ... 7 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object > 'orderstream' not found within 'ca1.db1' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 26 more > > It seems that Calcite cannot find the source object as expected, After I > debug the code I found that when using tableEnv.registerTableSource or > registerTableSink, It will use a build-in catalog with a hard-code catalog > name ( default-catalog ) and database name ( default_database ) while > tableEnv.registerCatalog here cannot change this behaviros, So is this a > reasonable behaviors ? If I don’t want to use default build-in catalog and > database, is there any other ways to do this ? > > > GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); > tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to > change build-in catalog !! > tableEnv.useCatalog(catalog.getName()); > catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), > "comment"), true); > tableEnv.useDatabase("db1"); > > tableEnv.connect(sourceKafka) > .withFormat(csv) > .withSchema(schema2) > .inAppendMode() > .registerTableSource("orderstream"); > > tableEnv.connect(sinkKafka) > .withFormat(csv) > .withSchema(schema2) > .inAppendMode() > .registerTableSink("sinkstream");; > > String sql = "insert into ca1.db1.sinkstream " + > "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from > ca1.db1.orderstream " + > "group by tumble(ts, INTERVAL '5' SECOND), data"; > > tableEnv.sqlUpdate(sql); > tableEnv.execute("test"); > > > Thanks, > SImon > > -- Xuefu Zhang "In Honey We Trust!" |
Hi Xuefu
Thanks for you reply. Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName” in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in” Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this: StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .withBuiltInCatalogName("ca1") .withBuiltInDatabaseName("db1") .build()); As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL. Thanks, SImon On 08/13/2019 02:55,Xuefu Z<[hidden email]> wrote: Hi Simon, Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release. Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase(). As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database. Please try those and let me know if you find new problems. Thanks, Xuefu On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: Hi All I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the SQL, and it raises the error like : Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) ... 7 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 26 more It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ? GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !! tableEnv.useCatalog(catalog.getName()); catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) .withFormat(csv) .withSchema(schema2) .inAppendMode() .registerTableSink("sinkstream");; String sql = "insert into ca1.db1.sinkstream " + "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " + "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); tableEnv.execute("test"); Thanks, SImon -- Xuefu Zhang "In Honey We Trust!" |
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use tableEnv.sqlUpdate("create table ...."). Thanks, Xuefu On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote: > Hi Xuefu > > Thanks for you reply. > > Actually I have tried it as your advises. I have tried to call > tableEnv.useCatalog and useDatabase. Also I have tried to use > “catalogname.databasename.tableName” in SQL. I think the root cause is > that when I call tableEnv.registerTableSource, it’s always use a “build-in” > Catalog and Database rather than the custom one. So if I want to use a > custom one, I have to write code like this: > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .withBuiltInCatalogName("ca1") > .withBuiltInDatabaseName("db1") > .build()); > > > As Dawid said, if I want to store in my custom catalog, I can call > catalog.createTable or using DDL. > > Thanks, > SImon > > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote: > > Hi Simon, > > Thanks for reporting the problem. There is some rough edges around catalog > API and table environments, and we are improving post 1.9 release. > > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in > Flink's CatalogManager, It doens't change the default catalog/database as > you expected. To switch to your newly registered catalog, you could call > tableEnv.useCatalog() and .useDatabase(). > > As an alternative, you could fully qualify your table name with a > "catalog.db.table" syntax without switching current catalog/database. > > Please try those and let me know if you find new problems. > > Thanks, > Xuefu > > > > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: > >> Hi All >> I want to use a custom catalog by setting the name “ca1” and create a >> database under this catalog. When I submit the >> SQL, and it raises the error like : >> >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: SQL validation failed. From >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found >> within 'ca1.db1' >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line >> 1, column 98 to line 1, column 116: Object 'orderstream' not found within >> 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) >> ... 7 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object >> 'orderstream' not found within 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> ... 26 more >> >> It seems that Calcite cannot find the source object as expected, After I >> debug the code I found that when using tableEnv.registerTableSource or >> registerTableSink, It will use a build-in catalog with a hard-code catalog >> name ( default-catalog ) and database name ( default_database ) while >> tableEnv.registerCatalog here cannot change this behaviros, So is this a >> reasonable behaviors ? If I don’t want to use default build-in catalog and >> database, is there any other ways to do this ? >> >> >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to >> change build-in catalog !! >> tableEnv.useCatalog(catalog.getName()); >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), >> "comment"), true); >> tableEnv.useDatabase("db1"); >> >> tableEnv.connect(sourceKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSource("orderstream"); >> >> tableEnv.connect(sinkKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSink("sinkstream");; >> >> String sql = "insert into ca1.db1.sinkstream " + >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from >> ca1.db1.orderstream " + >> "group by tumble(ts, INTERVAL '5' SECOND), data"; >> >> tableEnv.sqlUpdate(sql); >> tableEnv.execute("test"); >> >> >> Thanks, >> SImon >> >> > > -- > Xuefu Zhang > > "In Honey We Trust!" > > -- Xuefu Zhang "In Honey We Trust!" |
I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink. Currently, the comment says "Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog." But, what catalog? The current one or default in-memory one? I think, it would be better to improve the description and add a NOTE on it. Regards, Jark On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote: > Yes, tableEnv.registerTable(_) etc always registers in the default catalog. > To create table in your custom catalog, you could use > tableEnv.sqlUpdate("create table ...."). > > Thanks, > Xuefu > > On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote: > > > Hi Xuefu > > > > Thanks for you reply. > > > > Actually I have tried it as your advises. I have tried to call > > tableEnv.useCatalog and useDatabase. Also I have tried to use > > “catalogname.databasename.tableName” in SQL. I think the root cause is > > that when I call tableEnv.registerTableSource, it’s always use a > “build-in” > > Catalog and Database rather than the custom one. So if I want to use a > > custom one, I have to write code like this: > > > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > > EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .withBuiltInCatalogName("ca1") > > .withBuiltInDatabaseName("db1") > > .build()); > > > > > > As Dawid said, if I want to store in my custom catalog, I can call > > catalog.createTable or using DDL. > > > > Thanks, > > SImon > > > > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> > wrote: > > > > Hi Simon, > > > > Thanks for reporting the problem. There is some rough edges around > catalog > > API and table environments, and we are improving post 1.9 release. > > > > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in > > Flink's CatalogManager, It doens't change the default catalog/database as > > you expected. To switch to your newly registered catalog, you could call > > tableEnv.useCatalog() and .useDatabase(). > > > > As an alternative, you could fully qualify your table name with a > > "catalog.db.table" syntax without switching current catalog/database. > > > > Please try those and let me know if you find new problems. > > > > Thanks, > > Xuefu > > > > > > > > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: > > > >> Hi All > >> I want to use a custom catalog by setting the name “ca1” and create > a > >> database under this catalog. When I submit the > >> SQL, and it raises the error like : > >> > >> > >> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: SQL validation failed. > From > >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found > >> within 'ca1.db1' > >> at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) > >> at > >> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) > >> at > >> > org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) > >> at > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) > >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) > >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) > >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line > >> 1, column 98 to line 1, column 116: Object 'orderstream' not found > within > >> 'ca1.db1' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) > >> at > >> > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) > >> at > >> > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > >> at > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) > >> at > >> > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > >> at > >> > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) > >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) > >> at > >> > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) > >> at > >> > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) > >> ... 7 more > >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object > >> 'orderstream' not found within 'ca1.db1' > >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > >> at > >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > >> at > >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > >> at > >> > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > >> ... 26 more > >> > >> It seems that Calcite cannot find the source object as expected, After I > >> debug the code I found that when using tableEnv.registerTableSource or > >> registerTableSink, It will use a build-in catalog with a hard-code > catalog > >> name ( default-catalog ) and database name ( default_database ) while > >> tableEnv.registerCatalog here cannot change this behaviros, So is this a > >> reasonable behaviors ? If I don’t want to use default build-in catalog > and > >> database, is there any other ways to do this ? > >> > >> > >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); > >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to > >> change build-in catalog !! > >> tableEnv.useCatalog(catalog.getName()); > >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), > >> "comment"), true); > >> tableEnv.useDatabase("db1"); > >> > >> tableEnv.connect(sourceKafka) > >> .withFormat(csv) > >> .withSchema(schema2) > >> .inAppendMode() > >> .registerTableSource("orderstream"); > >> > >> tableEnv.connect(sinkKafka) > >> .withFormat(csv) > >> .withSchema(schema2) > >> .inAppendMode() > >> .registerTableSink("sinkstream");; > >> > >> String sql = "insert into ca1.db1.sinkstream " + > >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from > >> ca1.db1.orderstream " + > >> "group by tumble(ts, INTERVAL '5' SECOND), data"; > >> > >> tableEnv.sqlUpdate(sql); > >> tableEnv.execute("test"); > >> > >> > >> Thanks, > >> SImon > >> > >> > > > > -- > > Xuefu Zhang > > > > "In Honey We Trust!" > > > > > > -- > Xuefu Zhang > > "In Honey We Trust!" > |
Hi Jark
Thanks for your reply. It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ). Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important. Thanks, SImon On 08/13/2019 12:27,Jark Wu<[hidden email]> wrote: I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. Currently, the comment says "Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog." But, what catalog? The current one or default in-memory one? I think, it would be better to improve the description and add a NOTE on it. Regards, Jark On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote: Yes, tableEnv.registerTable(_) etc always registers in the default catalog. To create table in your custom catalog, you could use tableEnv.sqlUpdate("create table ...."). Thanks, Xuefu On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote: > Hi Xuefu > > Thanks for you reply. > > Actually I have tried it as your advises. I have tried to call > tableEnv.useCatalog and useDatabase. Also I have tried to use > “catalogname.databasename.tableName” in SQL. I think the root cause is > that when I call tableEnv.registerTableSource, it’s always use a “build-in” > Catalog and Database rather than the custom one. So if I want to use a > custom one, I have to write code like this: > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .withBuiltInCatalogName("ca1") > .withBuiltInDatabaseName("db1") > .build()); > > > As Dawid said, if I want to store in my custom catalog, I can call > catalog.createTable or using DDL. > > Thanks, > SImon > > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote: > > Hi Simon, > > Thanks for reporting the problem. There is some rough edges around catalog > API and table environments, and we are improving post 1.9 release. > > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in > Flink's CatalogManager, It doens't change the default catalog/database as > you expected. To switch to your newly registered catalog, you could call > tableEnv.useCatalog() and .useDatabase(). > > As an alternative, you could fully qualify your table name with a > "catalog.db.table" syntax without switching current catalog/database. > > Please try those and let me know if you find new problems. > > Thanks, > Xuefu > > > > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: > >> Hi All >> I want to use a custom catalog by setting the name “ca1” and create a >> database under this catalog. When I submit the >> SQL, and it raises the error like : >> >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: SQL validation failed. From >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found >> within 'ca1.db1' >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line >> 1, column 98 to line 1, column 116: Object 'orderstream' not found within >> 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) >> ... 7 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object >> 'orderstream' not found within 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> ... 26 more >> >> It seems that Calcite cannot find the source object as expected, After I >> debug the code I found that when using tableEnv.registerTableSource or >> registerTableSink, It will use a build-in catalog with a hard-code catalog >> name ( default-catalog ) and database name ( default_database ) while >> tableEnv.registerCatalog here cannot change this behaviros, So is this a >> reasonable behaviors ? If I don’t want to use default build-in catalog and >> database, is there any other ways to do this ? >> >> >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to >> change build-in catalog !! >> tableEnv.useCatalog(catalog.getName()); >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), >> "comment"), true); >> tableEnv.useDatabase("db1"); >> >> tableEnv.connect(sourceKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSource("orderstream"); >> >> tableEnv.connect(sinkKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSink("sinkstream");; >> >> String sql = "insert into ca1.db1.sinkstream " + >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from >> ca1.db1.orderstream " + >> "group by tumble(ts, INTERVAL '5' SECOND), data"; >> >> tableEnv.sqlUpdate(sql); >> tableEnv.execute("test"); >> >> >> Thanks, >> SImon >> >> > > -- > Xuefu Zhang > > "In Honey We Trust!" > > -- Xuefu Zhang "In Honey We Trust!" |
Hi Simon,
This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461. Regards, Jark On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote: > Hi Jark > > Thanks for your reply. > > It’s weird that In this case the tableEnv provide the api called > “registerCatalog”, but it does not work in some cases ( like my cases ). > Do you think it’s feasible to unify this behaviors ? I think the document > is necessary, but a unify way to use tableEnv is also important. > > Thanks, > SImon > > On 08/13/2019 12:27,Jark Wu<[hidden email]> <[hidden email]> wrote: > > I think we might need to improve the javadoc of > tableEnv.registerTableSource/registerTableSink. > Currently, the comment says > > "Registers an external TableSink with already configured field names and > field types in this TableEnvironment's catalog." > > But, what catalog? The current one or default in-memory one? > I think, it would be better to improve the description and add a NOTE on > it. > > Regards, > Jark > > On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote: > >> Yes, tableEnv.registerTable(_) etc always registers in the default >> catalog. >> To create table in your custom catalog, you could use >> tableEnv.sqlUpdate("create table ...."). >> >> Thanks, >> Xuefu >> >> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote: >> >> > Hi Xuefu >> > >> > Thanks for you reply. >> > >> > Actually I have tried it as your advises. I have tried to call >> > tableEnv.useCatalog and useDatabase. Also I have tried to use >> > “catalogname.databasename.tableName” in SQL. I think the root cause is >> > that when I call tableEnv.registerTableSource, it’s always use a >> “build-in” >> > Catalog and Database rather than the custom one. So if I want to use a >> > custom one, I have to write code like this: >> > >> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, >> > EnvironmentSettings.newInstance() >> > .useBlinkPlanner() >> > .inStreamingMode() >> > .withBuiltInCatalogName("ca1") >> > .withBuiltInDatabaseName("db1") >> > .build()); >> > >> > >> > As Dawid said, if I want to store in my custom catalog, I can call >> > catalog.createTable or using DDL. >> > >> > Thanks, >> > SImon >> > >> > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> >> wrote: >> > >> > Hi Simon, >> > >> > Thanks for reporting the problem. There is some rough edges around >> catalog >> > API and table environments, and we are improving post 1.9 release. >> > >> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in >> > Flink's CatalogManager, It doens't change the default catalog/database >> as >> > you expected. To switch to your newly registered catalog, you could call >> > tableEnv.useCatalog() and .useDatabase(). >> > >> > As an alternative, you could fully qualify your table name with a >> > "catalog.db.table" syntax without switching current catalog/database. >> > >> > Please try those and let me know if you find new problems. >> > >> > Thanks, >> > Xuefu >> > >> > >> > >> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: >> > >> >> Hi All >> >> I want to use a custom catalog by setting the name “ca1” and >> create a >> >> database under this catalog. When I submit the >> >> SQL, and it raises the error like : >> >> >> >> >> >> Exception in thread "main" >> >> org.apache.flink.table.api.ValidationException: SQL validation failed. >> From >> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found >> >> within 'ca1.db1' >> >> at >> >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) >> >> at >> >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) >> >> at >> >> >> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) >> >> at >> >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) >> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) >> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From >> line >> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found >> within >> >> 'ca1.db1' >> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> >> at >> >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> >> at >> >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> >> at >> >> >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) >> >> at >> >> >> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) >> >> at >> >> >> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) >> >> at >> >> >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) >> >> at >> >> >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> >> at >> >> >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) >> >> at >> >> >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) >> >> at >> >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) >> >> ... 7 more >> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >> Object >> >> 'orderstream' not found within 'ca1.db1' >> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> >> at >> >> >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> >> at >> >> >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> >> at >> >> >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> >> ... 26 more >> >> >> >> It seems that Calcite cannot find the source object as expected, After >> I >> >> debug the code I found that when using tableEnv.registerTableSource or >> >> registerTableSink, It will use a build-in catalog with a hard-code >> catalog >> >> name ( default-catalog ) and database name ( default_database ) while >> >> tableEnv.registerCatalog here cannot change this behaviros, So is this >> a >> >> reasonable behaviors ? If I don’t want to use default build-in catalog >> and >> >> database, is there any other ways to do this ? >> >> >> >> >> >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); >> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to >> >> change build-in catalog !! >> >> tableEnv.useCatalog(catalog.getName()); >> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), >> >> "comment"), true); >> >> tableEnv.useDatabase("db1"); >> >> >> >> tableEnv.connect(sourceKafka) >> >> .withFormat(csv) >> >> .withSchema(schema2) >> >> .inAppendMode() >> >> .registerTableSource("orderstream"); >> >> >> >> tableEnv.connect(sinkKafka) >> >> .withFormat(csv) >> >> .withSchema(schema2) >> >> .inAppendMode() >> >> .registerTableSink("sinkstream");; >> >> >> >> String sql = "insert into ca1.db1.sinkstream " + >> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from >> >> ca1.db1.orderstream " + >> >> "group by tumble(ts, INTERVAL '5' SECOND), data"; >> >> >> >> tableEnv.sqlUpdate(sql); >> >> tableEnv.execute("test"); >> >> >> >> >> >> Thanks, >> >> SImon >> >> >> >> >> > >> > -- >> > Xuefu Zhang >> > >> > "In Honey We Trust!" >> > >> > >> >> -- >> Xuefu Zhang >> >> "In Honey We Trust!" >> > |
OK, Thanks Jark
Thanks, SImon On 08/13/2019 14:05,Jark Wu<[hidden email]> wrote: Hi Simon, This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461. Regards, Jark On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote: Hi Jark Thanks for your reply. It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ). Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important. Thanks, SImon On 08/13/2019 12:27,Jark Wu<[hidden email]> wrote: I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. Currently, the comment says "Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog." But, what catalog? The current one or default in-memory one? I think, it would be better to improve the description and add a NOTE on it. Regards, Jark On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote: Yes, tableEnv.registerTable(_) etc always registers in the default catalog. To create table in your custom catalog, you could use tableEnv.sqlUpdate("create table ...."). Thanks, Xuefu On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote: > Hi Xuefu > > Thanks for you reply. > > Actually I have tried it as your advises. I have tried to call > tableEnv.useCatalog and useDatabase. Also I have tried to use > “catalogname.databasename.tableName” in SQL. I think the root cause is > that when I call tableEnv.registerTableSource, it’s always use a “build-in” > Catalog and Database rather than the custom one. So if I want to use a > custom one, I have to write code like this: > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .withBuiltInCatalogName("ca1") > .withBuiltInDatabaseName("db1") > .build()); > > > As Dawid said, if I want to store in my custom catalog, I can call > catalog.createTable or using DDL. > > Thanks, > SImon > > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote: > > Hi Simon, > > Thanks for reporting the problem. There is some rough edges around catalog > API and table environments, and we are improving post 1.9 release. > > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in > Flink's CatalogManager, It doens't change the default catalog/database as > you expected. To switch to your newly registered catalog, you could call > tableEnv.useCatalog() and .useDatabase(). > > As an alternative, you could fully qualify your table name with a > "catalog.db.table" syntax without switching current catalog/database. > > Please try those and let me know if you find new problems. > > Thanks, > Xuefu > > > > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote: > >> Hi All >> I want to use a custom catalog by setting the name “ca1” and create a >> database under this catalog. When I submit the >> SQL, and it raises the error like : >> >> >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: SQL validation failed. From >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found >> within 'ca1.db1' >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126) >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line >> 1, column 98 to line 1, column 116: Object 'orderstream' not found within >> 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166) >> at >> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122) >> ... 7 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object >> 'orderstream' not found within 'ca1.db1' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >> ... 26 more >> >> It seems that Calcite cannot find the source object as expected, After I >> debug the code I found that when using tableEnv.registerTableSource or >> registerTableSink, It will use a build-in catalog with a hard-code catalog >> name ( default-catalog ) and database name ( default_database ) while >> tableEnv.registerCatalog here cannot change this behaviros, So is this a >> reasonable behaviors ? If I don’t want to use default build-in catalog and >> database, is there any other ways to do this ? >> >> >> GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1"); >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to >> change build-in catalog !! >> tableEnv.useCatalog(catalog.getName()); >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), >> "comment"), true); >> tableEnv.useDatabase("db1"); >> >> tableEnv.connect(sourceKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSource("orderstream"); >> >> tableEnv.connect(sinkKafka) >> .withFormat(csv) >> .withSchema(schema2) >> .inAppendMode() >> .registerTableSink("sinkstream");; >> >> String sql = "insert into ca1.db1.sinkstream " + >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from >> ca1.db1.orderstream " + >> "group by tumble(ts, INTERVAL '5' SECOND), data"; >> >> tableEnv.sqlUpdate(sql); >> tableEnv.execute("test"); >> >> >> Thanks, >> SImon >> >> > > -- > Xuefu Zhang > > "In Honey We Trust!" > > -- Xuefu Zhang "In Honey We Trust!" |
Free forum by Nabble | Edit this page |