[jira] [Created] (FLINK-14012) Failed to start job for consuming Secure Kafka after the job cancel

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-14012) Failed to start job for consuming Secure Kafka after the job cancel

Shang Yuanchun (Jira)
Daebeom Lee created FLINK-14012:
-----------------------------------

             Summary: Failed to start job for consuming Secure Kafka after the job cancel
                 Key: FLINK-14012
                 URL: https://issues.apache.org/jira/browse/FLINK-14012
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.9.0
         Environment: * Kubernetes 1.13.2
 * Flink 1.9.0
 * Kafka client libary 2.2.0
            Reporter: Daebeom Lee


Hello, this is Daebeom Lee.
h2. Background

I installed Flink 1.9.0 at this our Kubernetes cluster.

We use Flink session cluster. - build fatJar file and upload it at the UI, run serval jobs.

At first, our jobs are good to start.

But, when we cancel some jobs, the job failed

This is the error code.


{code:java}
// code placeholder
java.lang.NoClassDefFoundError: org/apache/kafka/common/security/scram/internals/ScramSaslClient
    at org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
    at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
    at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
    at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
{code}
h2. Our workaround
 * I think that this is Flink JVM classloader issue.
 * Classloader unloads when job cancels by the way kafka client library is included fatJar.
 * So, I located Kafka client library to /opt/flink/lib 
 ** /opt/flink/lib/kafka-clients-2.2.0.jar
 * And then all issue solved.
 * But there are weird points
 ** When Flink 1.8.1 has no problem before 2 weeks
 ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred.
 ** Maybe docker image is changed at docker repository ( [https://github.com/docker-flink/docker-flink ) |https://github.com/docker-flink/docker-flink]

 
h2. Suggestion
 * I'd like to know why this error occurred exactly reason after upgrade 1.9.0.
 * Does anybody know a better solution in this case?

 

Thank you in advance.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)