Jingsong Lee created FLINK-15912:
------------------------------------ Summary: Add Context to improve TableSourceFactory and TableSinkFactory Key: FLINK-15912 URL: https://issues.apache.org/jira/browse/FLINK-15912 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Jingsong Lee Fix For: 1.11.0 Discussion in: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-TableFactory-td36647.html] Vote in: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Improve-TableFactory-to-add-Context-td37211.html] Motivation: Now the main needs and problems are: * Connector can't get TableConfig[1], and some behaviors really need to be controlled by the user's table configuration. In the era of catalog, we can't put these config in connector properties, which is too inconvenient. * A context class also allows for future modifications without touching the TableFactory interface again. Interface: {code:java} public interface TableSourceFactory<T> extends TableFactory { ...... /** * Creates and configures a {@link TableSource} based on the given {@link Context}. * * @param context context of this table source. * @return the configured table source. */ default TableSource<T> createTableSource(Context context) { return createTableSource( context.getObjectIdentifier().toObjectPath(), context.getTable()); } /** * Context of table source creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getObjectIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getConfiguration(); } } public interface TableSinkFactory<T> extends TableFactory { ...... /** * Creates and configures a {@link TableSink} based on the given {@link Context}. * * @param context context of this table sink. * @return the configured table sink. */ default TableSink<T> createTableSink(Context context) { return createTableSink( context.getObjectIdentifier().toObjectPath(), context.getTable()); } /** * Context of table sink creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getObjectIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getConfiguration(); } } {code} Add inner class into TableSourceFactory and TableSinkFactory, the reason they are defined repeatedly is that source and sink may need different properties in the future. [1] https://issues.apache.org/jira/browse/FLINK-15290 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |