jiang7chengzitc created FLINK-20224:
--------------------------------------- Summary: add username&password to provide a credential for es rest client Key: FLINK-20224 URL: https://issues.apache.org/jira/browse/FLINK-20224 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Affects Versions: 1.11.0 Reporter: jiang7chengzitc Fix For: 1.11.3 hello, Flink ElasticSearch Connector use Java High Level REST Client to process request for index, delete, get, update, etc. but some ES clusters (version 6 and higher) require security credentials to connect, So it can be considered to add username and password option to build security credentials, then connect to this ES cluster. for example: {code:java} //代码占位符 org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink @Override protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction( List<Host> hosts, ActionRequestFailureHandler failureHandler, Map<SinkOption, String> sinkOptions, ElasticsearchUpsertSinkFunction upsertSinkFunction) { ...... builder.setRestClientFactory( new DefaultRestClientFactory( Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT)) .map(Integer::valueOf) .orElse(null), sinkOptions.get(REST_PATH_PREFIX), sinkOptions.get(USERNAME), sinkOptions.get(PASSWORD))); ...... } @VisibleForTesting static class DefaultRestClientFactory implements RestClientFactory { private Integer maxRetryTimeout; private String pathPrefix; private String username; private String password; public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix,@Nullable String username, @Nullable String password) { this.maxRetryTimeout = maxRetryTimeout; this.pathPrefix = pathPrefix; this.username = username; this.password = password; } @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { if (maxRetryTimeout != null) { restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout); } if (pathPrefix != null) { restClientBuilder.setPathPrefix(pathPrefix); } // build credentialsProvider if (username != null && password != null) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) ); } } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } DefaultRestClientFactory that = (DefaultRestClientFactory) o; return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) && Objects.equals(pathPrefix, that.pathPrefix) && Objects.equals(username, that.username) && Objects.equals(password, that.password); } @Override public int hashCode() { return Objects.hash( maxRetryTimeout, pathPrefix, username, password); } }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |