Hi Team,
I am trying Flink 1.11 by using the official doc and examples. Here is my code. While executing I am getting this error. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 'format.type' expects 'csv', but is 'json' The following properties are requested: connector.properties.auto.offset.reset=latest connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testflinktopurl connector.property-version=1 connector.startup-mode=latest-offset connector.topic=badbotstream connector.type=kafka connector.version=0.11 format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.data-type=VARCHAR(2147483647) schema.0.name=sid schema.1.data-type=VARCHAR(2147483647) schema.1.name=_zpsbd2 schema.10.data-type=VARCHAR(2147483647) schema.10.name=noqryurl schema.11.data-type=VARCHAR(2147483647) schema.11.name=_zpsbd8 schema.2.data-type=VARCHAR(2147483647) schema.2.name=_zpsbd3 schema.3.data-type=VARCHAR(2147483647) schema.3.name=_zpsbd6 schema.4.data-type=VARCHAR(2147483647) schema.4.name=_zpsbd7 schema.5.data-type=VARCHAR(2147483647) schema.5.name=sign schema.6.data-type=VARCHAR(2147483647) schema.6.name=rule schema.7.data-type=VARCHAR(2147483647) schema.7.name=_blockdigest schema.8.data-type=VARCHAR(2147483647) schema.8.name=recvdTime schema.9.data-type=VARCHAR(2147483647) schema.9.name=botcode update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.filesystem.FileSystemTableFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:162) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:107) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:91) ~[flink-table_2.11-1.11.2.jar:1.11.2] at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_265] at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:82) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) ~[flink-table_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[flink-table_2.11-1.11.2.jar:1.11.2] StreamingJob.java (main class) EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); //Model 1 StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); fsTableEnv.connect(new Kafka() .version("0.11") .topic("badbotstream") .property("auto.offset.reset", "latest") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testflinktopurl") .startFromLatest()) .withFormat(new Json().failOnMissingField(false)) .withSchema(new Schema() .field("sid",DataTypes.STRING()) .field("_zpsbd2", DataTypes.STRING()) .field("_zpsbd3", DataTypes.STRING()) .field("_zpsbd6", DataTypes.STRING()) .field("_zpsbd7", DataTypes.STRING()) .field("sign", DataTypes.STRING()) .field("rule", DataTypes.STRING()) .field("_blockdigest", DataTypes.STRING()) .field("recvdTime", DataTypes.STRING()) .field("botcode", DataTypes.STRING()) .field("noqryurl", DataTypes.STRING()) .field("_zpsbd8", DataTypes.STRING())) .inAppendMode().createTemporaryTable("badbot"); // TableResult dsRow = fsTableEnv.executeSql("select * from badbot where sid = '4859'"); //Model 1 Table badBot = fsTableEnv.sqlQuery("select * from badbot where sid = '4859'"); DataStream<Row> dsRow = fsTableEnv.toAppendStream(badBot, Row.class) dsRow.print(); fsEnv.execute("pacifier"); Pom.xml <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> </dependencies> |
Free forum by Nabble | Edit this page |