[jira] [Created] (FLINK-14994) StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14994) StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format

Shang Yuanchun (Jira)
Dezhi Cai created FLINK-14994:
---------------------------------

             Summary: StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format
                 Key: FLINK-14994
                 URL: https://issues.apache.org/jira/browse/FLINK-14994
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem, Table SQL / API
    Affects Versions: 1.9.1, 1.9.0
            Reporter: Dezhi Cai
         Attachments: image-2019-11-29-22-50-50-367.png

I use two approaches to register table in StreamTableEnvironment.  The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception. 

{color:#FF0000}the root cause :{color}

"CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return  no "TableSourceFactory"

 

this approach run successfully (using DDL)
{code:java}
public static void main1(String[] args) throws Exception{
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);

    String sql = "create table test(last_update_dt TIMESTAMP) " +
                    "with (" +
                        "'connector.type' = 'filesystem'," +
                        "'connector.path' = 'C:/work/1.csv'," +
                        "'format.type' = 'csv'," +
                        "'format.fields.0.name' = 'last_update_dt'," +
                        "'format.fields.0.type' = 'TIMESTAMP'" +
                    ")";
    streamTableEnvironment.sqlUpdate(sql);
    Table data = streamTableEnvironment.sqlQuery("select * from test");
    streamTableEnvironment.toAppendStream(data, Row.class).print();
    environment.execute();
}{code}
 

this approach throw Exception
{code:java}
public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);

    streamTableEnvironment.connect(
            new FileSystem()
            .path("c:/work/1.csv")
    ).withFormat(
            new Csv()
            .schema(Types.ROW(Types.SQL_TIMESTAMP))
    ).withSchema(
            new Schema()
            .field("last_update_dt", Types.SQL_TIMESTAMP)
    ).inAppendMode().registerTableSource("test");

    Table data = streamTableEnvironment.sqlQuery("select * from test");
    streamTableEnvironment.toAppendStream(data, Row.class).print();

    environment.execute();
}
{code}
  !image-2019-11-29-22-50-50-367.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)