[jira] [Created] (FLINK-15111) java.lang.RuntimeException for KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80) when using SASL_SSL

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15111) java.lang.RuntimeException for KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80) when using SASL_SSL

Shang Yuanchun (Jira)
Hunter Kempf created FLINK-15111:
------------------------------------

             Summary: java.lang.RuntimeException for KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)  when using SASL_SSL
                 Key: FLINK-15111
                 URL: https://issues.apache.org/jira/browse/FLINK-15111
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.9.0
         Environment: Flink Version 1.9.0

Scala Version 2.11.12

Kafka Cluster Version 2.3.0
            Reporter: Hunter Kempf


Goal: I am trying to connect a flink job I made to a remote kafka cluster that has 3 partitions and requires authentication with SASL_SSL.

Background: I have tested my job against a kafka cluster topic running on my localhost that has one partition as well as one that had three partitions and it works to read and write to the local kafka.

Problem: When I attempt to connect to a topic that has multiple partitions and SASL_SSL I get the following error (for reference topicName is the name of the topic I am trying to consume). Weirdly I dont have any issues when I am trying to produce to a remote multi-partition topic.

 

```

 

{{java.lang.RuntimeException: topicName
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)}}

```

 

My Consumer Code looks like this :

 

```

{{  def defineKafkaDataStream[A: TypeInformation](topic: String,
                                                env: StreamExecutionEnvironment,
                                                SASL_username:String,
                                                SASL_password:String,
                                                kafkaBootstrapServer: String = "localhost:9092",
                                                zookeeperHost: String = "localhost:2181",
                                                groupId: String = "test"
                                               )(implicit c: JsonConverter[A]): DataStream[A] = \{
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    properties.setProperty("security.protocol" , "SASL_SSL")
    properties.setProperty("sasl.mechanism" , "PLAIN")
    val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"
    val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
    properties.setProperty("sasl.jaas.config", jaasConfig)
    properties.setProperty("group.id", "MyConsumerGroup")

    env
      .addSource(new FlinkKafkaConsumer(topic, new JSONKeyValueDeserializationSchema(true), properties))
      .map(x => x.convertTo[A](c))
  }}}

```

Is there some property I should be setting that I am not? Since it worked fine in Local I assume this is a bug with the interaction of the partition discoverer when authenticating with SASL_SSL but if it is something I can fix with a setting please let me know what to change

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)