Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch - now with stack trace

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch - now with stack trace

Ken Krugler
Hi all,

Updated - I found the logging for the error, and it’s happening during a GroupReduce. This is also happening at the same time as the leftOuterJoin and the CoGroup.

Caused by: java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1600303450883_0004/flink-io-719a95fa-eca4-4ac4-b2c5-7799315b626d/87cb5c578a889080d681cc00fc11023b.000001.channel (Too many open files)
        at java.io.RandomAccessFile.open0(Native Method) ~[?:1.8.0_252]
        at java.io.RandomAccessFile.open(RandomAccessFile.java:316) ~[?:1.8.0_252]
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243) ~[?:1.8.0_252]
        at java.io.RandomAccessFile.<init>(RandomAccessFile.java:124) ~[?:1.8.0_252]
        at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.<init>(AbstractFileIOChannel.java:57) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.<init>(AsynchronousFileIOChannel.java:87) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.<init>(AsynchronousBlockReader.java:60) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBlockChannelReader(IOManagerAsync.java:224) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.io.disk.iomanager.IOManager.createBlockChannelReader(IOManager.java:182) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getMergingIterator(UnilateralSortMerger.java:1543) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1475) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

I’m curious if increasing taskmanager.memory.segment-size would have an impact here (I assume it would still reduce the total number of open files being used by the leftOuterJoin and probably the CoGroup)

Thanks,

— Ken

==============================================================================================================

When I do a leftOuterJoin(stream, JoinHint.REPARTITION_SORT_MERGE), I’m running into an IOException caused by too many open files.

The slaves in my YARN cluster (each with 48 slots and 320gb memory) are currently set up with a limit of 32767, so I really don’t want to crank this up much higher.

In perusing the code, I assume the issue is that SpillingBuffer.nextSegment() can open a writer per segment.

So if I have a lot of data (e.g. several TBs) that I’m joining, I can wind up with > 32K segments that are open for writing.

Does that make sense? And is the best solution currently (without say using more, smaller servers to reduce the per-server load) to increase the taskmanager.memory.segment-size configuration setting from 32K to something larger, so I’d have fewer active segments?

If I do that, any known side-effects I should worry about?

Thanks,

— Ken

PS - a CoGroup is happening at the same time, so it’s possible that’s also hurting (or maybe the real source of the problem), but the leftOuterJoin failed first.

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr