[jira] [Created] (FLINK-19132) Failed to start jobs for consuming Secure Kafka after cluster restart

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19132) Failed to start jobs for consuming Secure Kafka after cluster restart

Shang Yuanchun (Jira)
Olivier Zembri created FLINK-19132:
--------------------------------------

             Summary: Failed to start jobs for consuming Secure Kafka after cluster restart
                 Key: FLINK-19132
                 URL: https://issues.apache.org/jira/browse/FLINK-19132
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.10.1, 1.9.1
            Reporter: Olivier Zembri


We deploy Flink jobs packaged as fat jar files compiled with Java 1.8 on a Flink session cluster in Kubernetes.

After restarting the Kubernetes cluster, the jobs fail to start and we get several NoClassDefFoundError in the Task Manager log.

*Stack trace*
{color:#7a869a}{color}
{code:java}
java.lang.NoClassDefFoundError: org.apache.kafka.common.security.scram.ScramSaslClient
{
 "class": "org.apache.kafka.common.security.scram.ScramSaslClient",
 "method": "evaluateChallenge",
 "file": "ScramSaslClient.java",
 "line": 128,
 },
 {
 "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
 "method": "run",
 "file": "SaslClientAuthenticator.java",
 "line": 280,
 },
 {
 "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
 "method": "run",
 "file": "SaslClientAuthenticator.java",
 "line": 278,
 },
 {
 "class": "java.security.AccessController",
 "method": "doPrivileged",
 "file": "AccessController.java",
 "line": -2,
 },
 {
 "class": "javax.security.auth.Subject",
 "method": "doAs",
 "file": "Subject.java",
 "line": 422,
 },
 {
 "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
 "method": "createSaslToken",
 "file": "SaslClientAuthenticator.java",
 "line": 278,
 },
 {
 "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
 "method": "sendSaslToken",
 "file": "SaslClientAuthenticator.java",
 "line": 215,
 },
 {
 "class": "org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
 "method": "authenticate",
 "file": "SaslClientAuthenticator.java",
 "line": 189,
 },
 {
 "class": "org.apache.kafka.common.network.KafkaChannel",
 "method": "prepare",
 "file": "KafkaChannel.java",
 "line": 76,
 },
 {
 "class": "org.apache.kafka.common.network.Selector",
 "method": "pollSelectionKeys",
 "file": "Selector.java",
 "line": 376,
 },
 {
 "class": "org.apache.kafka.common.network.Selector",
 "method": "poll",
 "file": "Selector.java",
 "line": 326,
 },
 {
 "class": "org.apache.kafka.clients.NetworkClient",
 "method": "poll",
 "file": "NetworkClient.java",
 "line": 433,
 },
 {
 "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
 "method": "poll",
 "file": "ConsumerNetworkClient.java",
 "line": 232,
 },
 {
 "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
 "method": "poll",
 "file": "ConsumerNetworkClient.java",
 "line": 208,
 },
 {
 "class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
 "method": "poll",
 "file": "ConsumerNetworkClient.java",
 "line": 184,
 },
 {
 "class": "org.apache.kafka.clients.consumer.internals.Fetcher",
 "method": "getTopicMetadata",
 "file": "Fetcher.java",
 "line": 314,
 },
 {
 "class": "org.apache.kafka.clients.consumer.KafkaConsumer",
 "method": "partitionsFor",
 "file": "KafkaConsumer.java",
 "line": 1386,
 },
 {
 "class": "org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer",
 "method": "getAllPartitionsForTopics",
 "file": "Kafka09PartitionDiscoverer.java",
 },
 {
 "class": "org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer",
 "method": "discoverPartitions",
 "file": "AbstractPartitionDiscoverer.java",
 "line": 131,
 },
 {
 "class": "org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase",
 "method": "open",
 "file": "FlinkKafkaConsumerBase.java",
 "line": 508,
 },
 {
 "class": "org.apache.flink.api.common.functions.util.FunctionUtils",
 "method": "openFunction",
 "file": "FunctionUtils.java",
 "line": 36,
 },
 {
 "class": "org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator",
 "method": "open",
 "file": "AbstractUdfStreamOperator.java",
 "line": 102,
 },
 {
 "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
 "method": "openAllOperators",
 "file": "StreamTask.java",
 "line": 532,
 },
 {
 "class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
 "method": "invoke",
 "file": "StreamTask.java",
 "line": 396,
 },
 {
 "class": "org.apache.flink.runtime.taskmanager.Task",
 "method": "doRun",
 "file": "Task.java",
 "line": 705,
 },
 {
 "class": "org.apache.flink.runtime.taskmanager.Task",
 "method": "run",
 "file": "Task.java",
 "line": 530,
 },
 {
 "class": "java.lang.Thread",
 "method": "run",
 "file": "Thread.java",
 "line": 748,
 }{code}
{color:#7a869a} {color}


*Workaround:*

- Copy the jar file containing the missing classes in the /lib folder
      /opt/flink/lib/kafka-clients-0.11.0.jar
- Update the [flink-conf.yaml|https://github.ibm.com/dba/taiga-flink/blob/master/conf/flink-conf.yaml] with
{{classloader.parent-first-patterns.additional: org.apache.kafka}}

{{_Note:_ This issue is very similar to https://issues.apache.org/jira/browse/FLINK-14012.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)