hi, community:
The implements of closable blocking queue[1] use a fair lock ReentrantLock to guarantee thread-safe. The changes below may be help:
1. replace ReentrantLock with ReentrantReadWriteLock to improve concurrency: in fair mode, the ReentrantReadWriteLock has better performance than ReentrantLock;[2]
2. replace singnalAll() with signal() to reduce the thread scheduling: signal() is safe and reasonable, since all the threads which waiting on the nonEmpty Condition wish to take element from queue, and at most one thread can get element from queue.
jira:
https://issues.apache.org/jira/browse/FLINK-19089[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java[2]
http://isuru-perera.blogspot.com/2016/05/benchmarking-java-locks-with-counters.html--
Best,
kui