[jira] [Created] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

Shang Yuanchun (Jira)
Greg Hogan created FLINK-2685:
---------------------------------

             Summary: TaskManager deadlock on NetworkBufferPool
                 Key: FLINK-2685
                 URL: https://issues.apache.org/jira/browse/FLINK-2685
             Project: Flink
          Issue Type: Bug
          Components: Distributed Runtime
    Affects Versions: master
            Reporter: Greg Hogan


This deadlock occurs intermittently. I have a {join} followed by a {chain<join,filter>} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below.

The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}).

{LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled.

First join stack trace and variable values from {LocalBufferPool.requestBuffer}:
{noformat}
owns: SpanningRecordSerializer<T>  (id=723)
waiting for: ArrayDeque<E>  (id=724)
Object.wait(long) line: not available [native method]
LocalBufferPool.requestBuffer(boolean) line: 163
LocalBufferPool.requestBufferBlocking() line: 133
RecordWriter<T>.emit(T) line: 92
OutputCollector<T>.collect(T) line: 65
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}

{noformat}
this LocalBufferPool  (id=403)
        availableMemorySegments ArrayDeque<E>  (id=398)
                elements Object[16]  (id=422)
                head 14
                tail 14
        currentPoolSize 60
        isDestroyed false
        networkBufferPool NetworkBufferPool  (id=354)
                allBufferPools HashSet<E>  (id=424)
                availableMemorySegments ArrayBlockingQueue<E>  (id=427)
                        count 0
                        items Object[10240]  (id=674)
                        itrs null
                        lock ReentrantLock  (id=675)
                        notEmpty AbstractQueuedSynchronizer$ConditionObject  (id=678)
                        notFull AbstractQueuedSynchronizer$ConditionObject  (id=679)
                        putIndex 6954
                        takeIndex 6954
                factoryLock Object  (id=430)
                isDestroyed false
                managedBufferPools HashSet<E>  (id=431)
                memorySegmentSize 32768
                numTotalRequiredBuffers 3226
                totalNumberOfMemorySegments 10240
        numberOfRequestedMemorySegments 60
        numberOfRequiredMemorySegments 32
        owner null
        registeredListeners ArrayDeque<E>  (id=421)
                elements Object[16]  (id=685)
                head 0
                tail 0
askToRecycle false
isBlocking true
{noformat}

Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}:
{noformat}
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.parkNanos(Object, long) line: 215
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
SingleInputGate.getNextBufferOrEvent() line: 414
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
MutableRecordReader<T>.next(T) line: 34
ReaderIterator<T>.next(T) line: 59
MutableHashTable$ProbeIterator<PT>.next() line: 1581
MutableHashTable<BT,PT>.processProbeIter() line: 457
MutableHashTable<BT,PT>.nextRecord() line: 555
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}

{noformat}
this SingleInputGate  (id=693)
        bufferPool LocalBufferPool  (id=706)
                availableMemorySegments ArrayDeque<E>  (id=716)
                        elements Object[16]  (id=717)
                        head 0
                        tail 0
                currentPoolSize 60
                isDestroyed false
                networkBufferPool NetworkBufferPool  (id=354)
                        allBufferPools HashSet<E>  (id=424)
                        availableMemorySegments ArrayBlockingQueue<E>  (id=427)
                                count 0
                                items Object[10240]  (id=674)
                                itrs null
                                lock ReentrantLock  (id=675)
                                notEmpty AbstractQueuedSynchronizer$ConditionObject  (id=678)
                                notFull AbstractQueuedSynchronizer$ConditionObject  (id=679)
                                putIndex 6954
                                takeIndex 6954
                        factoryLock Object  (id=430)
                        isDestroyed false
                        managedBufferPools HashSet<E>  (id=431)
                        memorySegmentSize 32768
                        numTotalRequiredBuffers 3226
                        totalNumberOfMemorySegments 10240
                numberOfRequestedMemorySegments 0
                numberOfRequiredMemorySegments 32
                owner null
                registeredListeners ArrayDeque<E>  (id=718)
        channelsWithEndOfPartitionEvents BitSet  (id=707)
        consumedResultId IntermediateDataSetID  (id=708)
        consumedSubpartitionIndex 24
        executionId ExecutionAttemptID  (id=709)
        hasReceivedAllEndOfPartitionEvents false
        inputChannels HashMap<K,V>  (id=710)
        inputChannelsWithData LinkedBlockingQueue<E>  (id=692)
                capacity 2147483647
                count AtomicInteger  (id=698)
                        value 0
                head LinkedBlockingQueue$Node<E>  (id=701)
                last LinkedBlockingQueue$Node<E>  (id=701)
                notEmpty AbstractQueuedSynchronizer$ConditionObject  (id=691)
                notFull AbstractQueuedSynchronizer$ConditionObject  (id=703)
                putLock ReentrantLock  (id=704)
                takeLock ReentrantLock  (id=705)
        isReleased false
        jobId JobID  (id=711)
        numberOfInputChannels 32
        numberOfUninitializedChannels 0
        owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
        partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)
        pendingEvents ArrayList<E>  (id=713)
        registeredListeners CopyOnWriteArrayList<E>  (id=714)
        requestedPartitionsFlag true
        requestLock Object  (id=715)
        retriggerLocalRequestTimer null
currentChannel null
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)