DuBin created FLINK-13340:
-----------------------------
Summary: Add more Kafka topic option of flink-connector-kafka
Key: FLINK-13340
URL:
https://issues.apache.org/jira/browse/FLINK-13340 Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.8.1
Reporter: DuBin
Currently, only 'topic' option implemented in the Kafka Connector Descriptor, we can only use it like :
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
tableEnv
.connect(
new Kafka()
.version("0.11")
.topic("test-flink-1")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(
new Json()
.deriveSchema()
)
.withSchema(
new Schema()
.field("name", Types.STRING)
.field("age", Types.STRING)
){code}
but we cannot consume multiple topics or a topic regex pattern.
Here is my thoughts:
{code:java}
.topic("test-flink-1")
//.topics("test-flink-1,test-flink-2") or topics(List<String> topics)
//.subscriptionPattern("test-flink-.*") or subscriptionPattern(Pattern pattern)
{code}
I already implement the code on my local env with help of the FlinkKafkaConsumer, and it works.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)