mzz created FLINK-18184:
--------------------------- Summary: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' Key: FLINK-18184 URL: https://issues.apache.org/jira/browse/FLINK-18184 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: local:macos flink1.9 Reporter: mzz val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) // checkpoint every 5000 msecs //kafak配置 val properties = new Properties() properties.setProperty("bootstrap.servers", "172.16.30.207:9092") properties.setProperty("group.id", "km_aggs_group") val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new SimpleStringSchema(), properties).setStartFromEarliest() // val source = env.addSource(kafkaConsumer) val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings) streamTableEnvironment.connect(new Kafka() .topic(TOPIC) .version(VERSION) .startFromEarliest() .property("bootstrap.servers", "172.16.30.207:9092") .property("zookeeper.connect", "172.16.30.207:2181") .property("group.id", "km_aggs_group_table") // .properties(properties) ) .withFormat( new Json() .failOnMissingField(true) .deriveSchema() ) .withSchema(new Schema() .field("advs", Types.STRING()) .field("devs", Types.STRING()) .field("environment", Types.STRING()) .field("events", Types.STRING()) .field("identity", Types.STRING()) .field("ip", Types.STRING()) .field("launchs", Types.STRING()) .field("ts", Types.STRING()) ) .inAppendMode() .registerTableSource("aggs_test") val tableResult = streamTableEnvironment.sqlQuery("select * from aggs_test") tableResult.printSchema() // streamTableEnvironment.toAppendStream[Row](tableResult).print() //启动程序 env.execute("test_kafka") -------------------------------------------------------- erroe message: Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70) at KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=172.16.30.207:2181 connector.properties.1.key=group.id connector.properties.1.value=km_aggs_group_table connector.properties.2.key=bootstrap.servers connector.properties.2.value=172.16.30.207:9092 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=aggs_topic connector.type=kafka connector.version=2.0 format.derive-schema=true format.fail-on-missing-field=true format.property-version=1 format.type=json schema.0.name=advs schema.0.type=VARCHAR schema.1.name=devs schema.1.type=VARCHAR schema.2.name=environment schema.2.type=VARCHAR schema.3.name=events schema.3.type=VARCHAR schema.4.name=identity schema.4.type=VARCHAR schema.5.name=ip schema.5.type=VARCHAR schema.6.name=launchs schema.6.type=VARCHAR schema.7.name=ts schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 4 more I've tried these way,Didn't solve my problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto] Anyone help me ,THX! -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |