Caizhi Weng created FLINK-13236:
-----------------------------------
Summary: Fix bug and improve performance in TopNBuffer
Key: FLINK-13236
URL:
https://issues.apache.org/jira/browse/FLINK-13236 Project: Flink
Issue Type: Improvement
Reporter: Caizhi Weng
Assignee: Caizhi Weng
In {{TopNBuffer}} we have the following method:
{code:java}
/**
* Puts a record list into the buffer under the sortKey.
* Note: if buffer already contains sortKey, putAll will overwrite the previous value
*
* @param sortKey sort key with which the specified values are to be associated
* @param values record lists to be associated with the specified key
*/
void putAll(BaseRow sortKey, Collection<BaseRow> values) {
treeMap.put(sortKey, values);
currentTopNum += values.size();
}
{code}
When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should be first subtracted by the old {{value.size()}} in {{treeMap}} then added (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses this method in its init procedure, this bug is not triggered.
{code:java}
/**
* Gets record which rank is given value.
*
* @param rank rank value to search
* @return the record which rank is given value
*/
BaseRow getElement(int rank) {
int curRank = 0;
Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
Collection<BaseRow> list = entry.getValue();
Iterator<BaseRow> listIter = list.iterator();
while (listIter.hasNext()) {
BaseRow elem = listIter.next();
curRank += 1;
if (curRank == rank) {
return elem;
}
}
}
return null;
}
{code}
We can remove the inner loop by adding {{curRank}} by {{list.size()}} each time.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)