[jira] [Created] (FLINK-20377) flink-1.11.2 -kerberos config on kafka connector not working

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20377) flink-1.11.2 -kerberos config on kafka connector not working

Shang Yuanchun (Jira)
谢波 created FLINK-20377:
--------------------------

             Summary: flink-1.11.2 -kerberos config on kafka connector not working
                 Key: FLINK-20377
                 URL: https://issues.apache.org/jira/browse/FLINK-20377
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Table SQL / Ecosystem
    Affects Versions: 1.11.2
         Environment: flink on yarn

kafka with kerberos 

flink-1.11.2_2.11
            Reporter: 谢波
             Fix For: 1.12.0


I refer to the configuration on the official website to configure Kafka and flink-conf.yaml ,but the configuration does not work.

my table config :

WITH (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'xxxxxxxx',
 'topic' = 'kafka_hepecc_ekko_cut_json',
 'properties.group.id' = 'ekko.group',
 'properties.security.protocol' = 'SASL_PLAINTEXT',
 'properties.sasl.kerberos.service.name' = 'kafka',
-- 'properties.sasl.mechanism' = 'GSSAPI',
 'format' = 'json'
);

yaml:

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab
security.kerberos.login.principal: [hidden email]

# The configuration below defines which JAAS login contexts

security.kerberos.login.contexts: Client,KafkaClient

 

dir content:

[xiebo@ww021 keytab]$ pwd
/home/xiebo/module/flink/keytab
[xiebo@ww021 keytab]$ ll
total 12
-rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf
-rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf
-rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab

 

I get an error:

 
2020-11-26 19:01:55
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
    ... 23 more
Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user

    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:901)
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764)
    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
 

reference :

[https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html]

[https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems]

[https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#enabling-kerberos-authentication]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)