谢波 created FLINK-20377:
-------------------------- Summary: flink-1.11.2 -kerberos config on kafka connector not working Key: FLINK-20377 URL: https://issues.apache.org/jira/browse/FLINK-20377 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Table SQL / Ecosystem Affects Versions: 1.11.2 Environment: flink on yarn kafka with kerberos flink-1.11.2_2.11 Reporter: 谢波 Fix For: 1.12.0 I refer to the configuration on the official website to configure Kafka and flink-conf.yaml ,but the configuration does not work. my table config : WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'xxxxxxxx', 'topic' = 'kafka_hepecc_ekko_cut_json', 'properties.group.id' = 'ekko.group', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', -- 'properties.sasl.mechanism' = 'GSSAPI', 'format' = 'json' ); yaml: security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /home/xiebo/module/flink/keytab/xiebo.keytab security.kerberos.login.principal: [hidden email] # The configuration below defines which JAAS login contexts security.kerberos.login.contexts: Client,KafkaClient dir content: [xiebo@ww021 keytab]$ pwd /home/xiebo/module/flink/keytab [xiebo@ww021 keytab]$ ll total 12 -rw-r--r-- 1 xiebo bigdata_dev 486 Nov 26 18:15 kafka_client_jaas.conf -rw-r--r-- 1 xiebo bigdata_dev 568 Nov 26 14:10 krb5.conf -rw-r--r-- 1 xiebo bigdata_dev 436 Nov 26 15:14 xiebo.keytab I get an error: 2020-11-26 19:01:55 org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450) at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421) ... 23 more Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:901) at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:764) at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103) at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147) reference : [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html] [https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems] [https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#enabling-kerberos-authentication] -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |