[jira] [Created] (FLINK-13340) Add more Kafka topic option of flink-connector-kafka

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-13340) Add more Kafka topic option of flink-connector-kafka

Shang Yuanchun (Jira)
DuBin created FLINK-13340:
-----------------------------

             Summary: Add more Kafka topic option of flink-connector-kafka
                 Key: FLINK-13340
                 URL: https://issues.apache.org/jira/browse/FLINK-13340
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.8.1
            Reporter: DuBin


Currently, only 'topic' option implemented in the Kafka Connector Descriptor, we can only use it like :
{code:java}
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    tableEnv
      .connect(
        new Kafka()
          .version("0.11")
          .topic("test-flink-1")
          .startFromEarliest()
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092"))
      .withFormat(
        new Json()
          .deriveSchema()
      )
      .withSchema(
        new Schema()
          .field("name", Types.STRING)
          .field("age", Types.STRING)
      ){code}
but we cannot consume multiple topics or a topic regex pattern. 

Here is my thoughts:
{code:java}
          .topic("test-flink-1")
          //.topics("test-flink-1,test-flink-2") or topics(List<String> topics)
          //.subscriptionPattern("test-flink-.*") or subscriptionPattern(Pattern pattern)
{code}
I already implement the code on my local env with help of the FlinkKafkaConsumer, and it works.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)