Caizhi Weng created FLINK-18668:
----------------------------------- Summary: BytesHashMap#growAndRehash should release newly allocated segments before throwing the exception Key: FLINK-18668 URL: https://issues.apache.org/jira/browse/FLINK-18668 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.0 Reporter: Caizhi Weng In {{BytesHashMap#growAndRehash}} we have the following code. {code:java} List<MemorySegment> newBucketSegments = new ArrayList<>(required); try { int numAllocatedSegments = required - memoryPool.freePages(); if (numAllocatedSegments > 0) { throw new MemoryAllocationException(); } int needNumFromFreeSegments = required - newBucketSegments.size(); for (int end = needNumFromFreeSegments; end > 0; end--) { newBucketSegments.add(memoryPool.nextSegment()); } setBucketVariables(newBucketSegments); } catch (MemoryAllocationException e) { LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages", required, reservedNumBuffers); throw new EOFException(); } {code} Newly allocated memory segments are temporarily stored in {{newBucketSegments}} before giving to the hash table. But if a {{MemoryAllocationException}} happens, these segments are not returned to the memory pool, causing the following exception stack trace. {code} java.lang.RuntimeException: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 512 pages at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.growAndRehash(BytesHashMap.java:393) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.append(BytesHashMap.java:313) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at HashAggregateWithKeys$360.processElement(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:560) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102] Suppressed: java.lang.RuntimeException: Should return all used memory before clean, page used: 2814 at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.close(LazyMemorySegmentPool.java:99) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:486) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:475) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at HashAggregateWithKeys$360.close(Unknown Source) ~[?:?] at org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:44) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:707) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:687) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:626) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102] Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 512 pages at org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ... 15 more Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 16777216 bytes, only 0 bytes are remaining at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:159) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:85) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:227) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82) ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ... 15 more {code} We should first return these segments to the memory pool before throwing the exception. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |