Dezhi Cai created FLINK-14994:
--------------------------------- Summary: StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format Key: FLINK-14994 URL: https://issues.apache.org/jira/browse/FLINK-14994 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Table SQL / API Affects Versions: 1.9.1, 1.9.0 Reporter: Dezhi Cai Attachments: image-2019-11-29-22-50-50-367.png I use two approaches to register table in StreamTableEnvironment. The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception. {color:#FF0000}the root cause :{color} "CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return no "TableSourceFactory" this approach run successfully (using DDL) {code:java} public static void main1(String[] args) throws Exception{ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings); String sql = "create table test(last_update_dt TIMESTAMP) " + "with (" + "'connector.type' = 'filesystem'," + "'connector.path' = 'C:/work/1.csv'," + "'format.type' = 'csv'," + "'format.fields.0.name' = 'last_update_dt'," + "'format.fields.0.type' = 'TIMESTAMP'" + ")"; streamTableEnvironment.sqlUpdate(sql); Table data = streamTableEnvironment.sqlQuery("select * from test"); streamTableEnvironment.toAppendStream(data, Row.class).print(); environment.execute(); }{code} this approach throw Exception {code:java} public static void main(String[] args) throws Exception{ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings); streamTableEnvironment.connect( new FileSystem() .path("c:/work/1.csv") ).withFormat( new Csv() .schema(Types.ROW(Types.SQL_TIMESTAMP)) ).withSchema( new Schema() .field("last_update_dt", Types.SQL_TIMESTAMP) ).inAppendMode().registerTableSource("test"); Table data = streamTableEnvironment.sqlQuery("select * from test"); streamTableEnvironment.toAppendStream(data, Row.class).print(); environment.execute(); } {code} !image-2019-11-29-22-50-50-367.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |