Flink cannot recognized catalog set by registerCatalog.

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink cannot recognized catalog set by registerCatalog.

Simon Su
Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more
   
It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon

Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Xuefu Z
Hi Simon,

Thanks for reporting the problem. There is some rough edges around catalog
API and table environments, and we are improving post 1.9 release.

Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
Flink's CatalogManager, It doens't change the default catalog/database as
you expected. To switch to your newly registered catalog, you could call
tableEnv.useCatalog() and .useDatabase().

As an alternative, you could fully qualify your table name with a
"catalog.db.table" syntax without switching current catalog/database.

Please try those and let me know if you find new problems.

Thanks,
Xuefu



On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:

> Hi All
>     I want to use a custom catalog by setting the name “ca1” and create a
> database under this catalog. When I submit the
> SQL, and it raises the error like :
>
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed. From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
> 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>
> It seems that Calcite cannot find the source object as expected, After I
> debug the code I found that when using tableEnv.registerTableSource or
> registerTableSink, It will use a build-in catalog with a hard-code catalog
> name ( default-catalog ) and database name ( default_database ) while
> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> reasonable behaviors ? If I don’t want to use default build-in catalog and
> database, is there any other ways to do this ?
>
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> change build-in catalog !!
> tableEnv.useCatalog(catalog.getName());
> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> "comment"), true);
> tableEnv.useDatabase("db1");
>
> tableEnv.connect(sourceKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSource("orderstream");
>
> tableEnv.connect(sinkKafka)
> .withFormat(csv)
> .withSchema(schema2)
> .inAppendMode()
> .registerTableSink("sinkstream");;
>
> String sql = "insert into ca1.db1.sinkstream " +
> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data";
>
> tableEnv.sqlUpdate(sql);
> tableEnv.execute("test");
>
>
> Thanks,
> SImon
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Simon Su
Hi Xuefu


Thanks for you reply.


Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase. Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have to write code like this:


StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());


As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or using DDL.


Thanks,
SImon


On 08/13/2019 02:55,Xuefu Z<[hidden email]> wrote:
Hi Simon,


Thanks for reporting the problem. There is some rough edges around catalog API and table environments, and we are improving post 1.9 release.


Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager, It doens't change the default catalog/database as you expected. To switch to your newly registered catalog, you could call tableEnv.useCatalog() and .useDatabase().


As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax without switching current catalog/database.


Please try those and let me know if you find new problems.


Thanks,
Xuefu







On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:

Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database ) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable behaviors ? If I don’t want to use default build-in catalog and database, is there any other ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog !!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon




--

Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Xuefu Z
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Jark Wu-2
I think we might need to improve the javadoc of
tableEnv.registerTableSource/registerTableSink.
Currently, the comment says

"Registers an external TableSink with already configured field names and
field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on
it.

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:

> Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
> To create table in your custom catalog, you could use
> tableEnv.sqlUpdate("create table ....").
>
> Thanks,
> Xuefu
>
> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:
>
> > Hi Xuefu
> >
> > Thanks for you reply.
> >
> > Actually I have tried it as your advises. I have tried to call
> > tableEnv.useCatalog and useDatabase. Also I have tried to use
> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
> > that when I call tableEnv.registerTableSource, it’s always use a
> “build-in”
> > Catalog and Database rather than the custom one. So if I want to use a
> > custom one, I have to write code like this:
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> >     EnvironmentSettings.newInstance()
> >         .useBlinkPlanner()
> >         .inStreamingMode()
> >         .withBuiltInCatalogName("ca1")
> >         .withBuiltInDatabaseName("db1")
> >         .build());
> >
> >
> > As Dawid said, if I want to store in my custom catalog, I can call
> > catalog.createTable or using DDL.
> >
> > Thanks,
> > SImon
> >
> > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]>
> wrote:
> >
> > Hi Simon,
> >
> > Thanks for reporting the problem. There is some rough edges around
> catalog
> > API and table environments, and we are improving post 1.9 release.
> >
> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> > Flink's CatalogManager, It doens't change the default catalog/database as
> > you expected. To switch to your newly registered catalog, you could call
> > tableEnv.useCatalog() and .useDatabase().
> >
> > As an alternative, you could fully qualify your table name with a
> > "catalog.db.table" syntax without switching current catalog/database.
> >
> > Please try those and let me know if you find new problems.
> >
> > Thanks,
> > Xuefu
> >
> >
> >
> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
> >
> >> Hi All
> >>     I want to use a custom catalog by setting the name “ca1” and create
> a
> >> database under this catalog. When I submit the
> >> SQL, and it raises the error like :
> >>
> >>
> >>     Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From
> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
> >> within 'ca1.db1'
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> >> at
> >>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> >> at
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> >> at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
> within
> >> 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> >> at
> >>
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> >> at
> >>
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> >> at
> >>
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> >> at
> >>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> >> at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> >> ... 7 more
> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
> >> 'orderstream' not found within 'ca1.db1'
> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >> at
> >>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >> at
> >>
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> >> ... 26 more
> >>
> >> It seems that Calcite cannot find the source object as expected, After I
> >> debug the code I found that when using tableEnv.registerTableSource or
> >> registerTableSink, It will use a build-in catalog with a hard-code
> catalog
> >> name ( default-catalog ) and database name ( default_database ) while
> >> tableEnv.registerCatalog here cannot change this behaviros, So is this a
> >> reasonable behaviors ? If I don’t want to use default build-in catalog
> and
> >> database, is there any other ways to do this ?
> >>
> >>
> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
> >> change build-in catalog !!
> >> tableEnv.useCatalog(catalog.getName());
> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
> >> "comment"), true);
> >> tableEnv.useDatabase("db1");
> >>
> >> tableEnv.connect(sourceKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSource("orderstream");
> >>
> >> tableEnv.connect(sinkKafka)
> >> .withFormat(csv)
> >> .withSchema(schema2)
> >> .inAppendMode()
> >> .registerTableSink("sinkstream");;
> >>
> >> String sql = "insert into ca1.db1.sinkstream " +
> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
> >> ca1.db1.orderstream " +
> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
> >>
> >> tableEnv.sqlUpdate(sql);
> >> tableEnv.execute("test");
> >>
> >>
> >> Thanks,
> >> SImon
> >>
> >>
> >
> > --
> > Xuefu Zhang
> >
> > "In Honey We Trust!"
> >
> >
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Simon Su
Hi Jark


Thanks for your reply.


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<[hidden email]> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink.
Currently, the comment says


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on it.


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Jark Wu-2
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in
1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote:

> Hi Jark
>
> Thanks for your reply.
>
> It’s weird that In this case the tableEnv provide the api called
> “registerCatalog”, but it does not work in some cases ( like my cases ).
> Do you think it’s feasible to unify this behaviors ? I think the document
> is necessary, but a unify way to use tableEnv is also important.
>
> Thanks,
> SImon
>
> On 08/13/2019 12:27,Jark Wu<[hidden email]> <[hidden email]> wrote:
>
> I think we might need to improve the javadoc of
> tableEnv.registerTableSource/registerTableSink.
> Currently, the comment says
>
> "Registers an external TableSink with already configured field names and
> field types in this TableEnvironment's catalog."
>
> But, what catalog? The current one or default in-memory one?
> I think, it would be better to improve the description and add a NOTE on
> it.
>
> Regards,
> Jark
>
> On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:
>
>> Yes, tableEnv.registerTable(_) etc always registers in the default
>> catalog.
>> To create table in your custom catalog, you could use
>> tableEnv.sqlUpdate("create table ....").
>>
>> Thanks,
>> Xuefu
>>
>> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:
>>
>> > Hi Xuefu
>> >
>> > Thanks for you reply.
>> >
>> > Actually I have tried it as your advises. I have tried to call
>> > tableEnv.useCatalog and useDatabase. Also I have tried to use
>> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
>> > that when I call tableEnv.registerTableSource, it’s always use a
>> “build-in”
>> > Catalog and Database rather than the custom one. So if I want to use a
>> > custom one, I have to write code like this:
>> >
>> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>> >     EnvironmentSettings.newInstance()
>> >         .useBlinkPlanner()
>> >         .inStreamingMode()
>> >         .withBuiltInCatalogName("ca1")
>> >         .withBuiltInDatabaseName("db1")
>> >         .build());
>> >
>> >
>> > As Dawid said, if I want to store in my custom catalog, I can call
>> > catalog.createTable or using DDL.
>> >
>> > Thanks,
>> > SImon
>> >
>> > On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]>
>> wrote:
>> >
>> > Hi Simon,
>> >
>> > Thanks for reporting the problem. There is some rough edges around
>> catalog
>> > API and table environments, and we are improving post 1.9 release.
>> >
>> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
>> > Flink's CatalogManager, It doens't change the default catalog/database
>> as
>> > you expected. To switch to your newly registered catalog, you could call
>> > tableEnv.useCatalog() and .useDatabase().
>> >
>> > As an alternative, you could fully qualify your table name with a
>> > "catalog.db.table" syntax without switching current catalog/database.
>> >
>> > Please try those and let me know if you find new problems.
>> >
>> > Thanks,
>> > Xuefu
>> >
>> >
>> >
>> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>> >
>> >> Hi All
>> >>     I want to use a custom catalog by setting the name “ca1” and
>> create a
>> >> database under this catalog. When I submit the
>> >> SQL, and it raises the error like :
>> >>
>> >>
>> >>     Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From
>> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> >> within 'ca1.db1'
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line
>> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within
>> >> 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> >> ... 7 more
>> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>> Object
>> >> 'orderstream' not found within 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> >> ... 26 more
>> >>
>> >> It seems that Calcite cannot find the source object as expected, After
>> I
>> >> debug the code I found that when using tableEnv.registerTableSource or
>> >> registerTableSink, It will use a build-in catalog with a hard-code
>> catalog
>> >> name ( default-catalog ) and database name ( default_database ) while
>> >> tableEnv.registerCatalog here cannot change this behaviros, So is this
>> a
>> >> reasonable behaviors ? If I don’t want to use default build-in catalog
>> and
>> >> database, is there any other ways to do this ?
>> >>
>> >>
>> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> >> change build-in catalog !!
>> >> tableEnv.useCatalog(catalog.getName());
>> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> >> "comment"), true);
>> >> tableEnv.useDatabase("db1");
>> >>
>> >> tableEnv.connect(sourceKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSource("orderstream");
>> >>
>> >> tableEnv.connect(sinkKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSink("sinkstream");;
>> >>
>> >> String sql = "insert into ca1.db1.sinkstream " +
>> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> >> ca1.db1.orderstream " +
>> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
>> >>
>> >> tableEnv.sqlUpdate(sql);
>> >> tableEnv.execute("test");
>> >>
>> >>
>> >> Thanks,
>> >> SImon
>> >>
>> >>
>> >
>> > --
>> > Xuefu Zhang
>> >
>> > "In Honey We Trust!"
>> >
>> >
>>
>> --
>> Xuefu Zhang
>>
>> "In Honey We Trust!"
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink cannot recognized catalog set by registerCatalog.

Simon Su
OK, Thanks Jark


Thanks,
SImon


On 08/13/2019 14:05,Jark Wu<[hidden email]> wrote:
Hi Simon,


This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.


Regards,
Jark


On Tue, 13 Aug 2019 at 13:57, Simon Su <[hidden email]> wrote:

Hi Jark


Thanks for your reply.


It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.


Thanks,
SImon


On 08/13/2019 12:27,Jark Wu<[hidden email]> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink.
Currently, the comment says


"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."


But, what catalog? The current one or default in-memory one?
I think, it would be better to improve the description and add a NOTE on it.


Regards,
Jark


On Tue, 13 Aug 2019 at 10:52, Xuefu Z <[hidden email]> wrote:

Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <[hidden email]> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<[hidden email]> <[hidden email]> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <[hidden email]> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"