Flink table error in 1.11 "Could not find a suitable table factory "

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink table error in 1.11 "Could not find a suitable table factory "

Ravi Sankar Reddy Sangana
Hi Team,

I am trying Flink 1.11 by using the official doc and examples. Here is my code.

While executing I am getting this error.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.auto.offset.reset=latest
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testflinktopurl
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=badbotstream
connector.type=kafka
connector.version=0.11
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=sid
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=_zpsbd2
schema.10.data-type=VARCHAR(2147483647)
schema.10.name=noqryurl
schema.11.data-type=VARCHAR(2147483647)
schema.11.name=_zpsbd8
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=_zpsbd3
schema.3.data-type=VARCHAR(2147483647)
schema.3.name=_zpsbd6
schema.4.data-type=VARCHAR(2147483647)
schema.4.name=_zpsbd7
schema.5.data-type=VARCHAR(2147483647)
schema.5.name=sign
schema.6.data-type=VARCHAR(2147483647)
schema.6.name=rule
schema.7.data-type=VARCHAR(2147483647)
schema.7.name=_blockdigest
schema.8.data-type=VARCHAR(2147483647)
schema.8.name=recvdTime
schema.9.data-type=VARCHAR(2147483647)
schema.9.name=botcode
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
        at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:162) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:107) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:91) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_265]
        at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:82) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) ~[flink-table_2.11-1.11.2.jar:1.11.2]
        at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[flink-table_2.11-1.11.2.jar:1.11.2]


StreamingJob.java (main class)
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);   //Model 1
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

                fsTableEnv.connect(new Kafka()
                .version("0.11")
                .topic("badbotstream")
                .property("auto.offset.reset", "latest")
                .property("bootstrap.servers", "localhost:9092")
                .property("group.id", "testflinktopurl")
                .startFromLatest())
                .withFormat(new Json().failOnMissingField(false))
                .withSchema(new Schema()
                        .field("sid",DataTypes.STRING())
                        .field("_zpsbd2", DataTypes.STRING())
                        .field("_zpsbd3", DataTypes.STRING())
                        .field("_zpsbd6", DataTypes.STRING())
                        .field("_zpsbd7", DataTypes.STRING())
                        .field("sign", DataTypes.STRING())
                        .field("rule", DataTypes.STRING())
                        .field("_blockdigest", DataTypes.STRING())
                        .field("recvdTime", DataTypes.STRING())
                        .field("botcode", DataTypes.STRING())
                        .field("noqryurl", DataTypes.STRING())
                        .field("_zpsbd8", DataTypes.STRING()))
                .inAppendMode().createTemporaryTable("badbot");

        // TableResult dsRow = fsTableEnv.executeSql("select * from badbot where sid = '4859'"); //Model 1
        Table badBot = fsTableEnv.sqlQuery("select * from badbot where sid = '4859'");
        DataStream<Row> dsRow = fsTableEnv.toAppendStream(badBot, Row.class)
        dsRow.print();

        fsEnv.execute("pacifier");



Pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>