[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

Shang Yuanchun (Jira)
Terry Wang created FLINK-15552:
----------------------------------

             Summary: SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
                 Key: FLINK-15552
                 URL: https://issues.apache.org/jira/browse/FLINK-15552
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Client, Table SQL / Runtime
            Reporter: Terry Wang


How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following
`
Flink SQL> CREATE TABLE MyUserTable (

>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown:

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't pass current classLoader to the this method, the defualt loader will be BootStrapClassLoader, which can not find our factory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)