Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch

Ken Krugler
Hi all,

When I do a leftOuterJoin(stream, JoinHint.REPARTITION_SORT_MERGE), I’m running into an IOException caused by too many open files.

The slaves in my YARN cluster (each with 48 slots and 320gb memory) are currently set up with a limit of 32767, so I really don’t want to crank this up much higher.

In perusing the code, I assume the issue is that SpillingBuffer.nextSegment() can open a writer per segment.

So if I have a lot of data (e.g. several TBs) that I’m joining, I can wind up with > 32K segments that are open for writing.

Does that make sense? And is the best solution currently (without say using more, smaller servers to reduce the per-server load) to increase the taskmanager.memory.segment-size configuration setting from 32K to something larger, so I’d have fewer active segments?

If I do that, any known side-effects I should worry about?

Thanks,

— Ken

PS - a CoGroup is happening at the same time, so it’s possible that’s also hurting (or maybe the real source of the problem), but the leftOuterJoin failed first.

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch

Chesnay Schepler-3
I don't think the segment-size will help here.

If I understand the code correctly, then we have a fixed number of
segments (# = memory/segment size), and if all segments are full we
spill _all_ current segments in memory to disk into a single file, and
re-use this file for future spilling until we stopped spilling.

So, we do not create 1 file per segment, but 1 file per instance of no
memory being available. And there should be at most 1 file per subtask
at any point.

As such, increasing the segment size shouldn't have any effect; you have
fewer segments, but the overall memory usage stays the same, and
spilling should occur just as often. More memory probably also will not
help, given that we will end up having to spill anyway with such a data
volume (I suppose).

Are there possibly other sources for files being created?  (anything in
the user-code?)
Could it be something annoying like the buffers rapidly switching
between spilling/reading from memory, creating a new file on each spill,
overwhelming the OS?

On 9/17/2020 11:06 PM, Ken Krugler wrote:

> Hi all,
>
> When I do a leftOuterJoin(stream, JoinHint.REPARTITION_SORT_MERGE), I’m running into an IOException caused by too many open files.
>
> The slaves in my YARN cluster (each with 48 slots and 320gb memory) are currently set up with a limit of 32767, so I really don’t want to crank this up much higher.
>
> In perusing the code, I assume the issue is that SpillingBuffer.nextSegment() can open a writer per segment.
>
> So if I have a lot of data (e.g. several TBs) that I’m joining, I can wind up with > 32K segments that are open for writing.
>
> Does that make sense? And is the best solution currently (without say using more, smaller servers to reduce the per-server load) to increase the taskmanager.memory.segment-size configuration setting from 32K to something larger, so I’d have fewer active segments?
>
> If I do that, any known side-effects I should worry about?
>
> Thanks,
>
> — Ken
>
> PS - a CoGroup is happening at the same time, so it’s possible that’s also hurting (or maybe the real source of the problem), but the leftOuterJoin failed first.
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Avoiding "too many open files" during leftOuterJoin with Flink 1.11/batch

Ken Krugler
Hi Chesnay,

Thanks, and you were right - it wasn’t a case of too many memory segments triggering too many open files.

It was a configuration issue with Elasticsearch clients being used by a custom function. This just happened to start being executed at the same time as the leftOuterJoin & CoGroup.

Each ES client has an HttpAsync connection pool with 30 connections, and these connections have a linger time.

Each connection requires 3 file descriptors (1 a_inode, 1 FIFO read, 1FIFO write).

Each subtask uses 17 clients, writing to different ES indices.

Each TM has 24 slots.

So 24 * 17 * 30 * 3 = 36,720 or slightly above the server's 32K max open files.

— Ken

PS - what’s sad is a few months ago I’d written up a document for a client describing the need to tune their HttpClient connection pool, based on their Flink job’s parallelism…doh!

> On Sep 18, 2020, at 2:31 AM, Chesnay Schepler <[hidden email]> wrote:
>
> I don't think the segment-size will help here.
>
> If I understand the code correctly, then we have a fixed number of segments (# = memory/segment size), and if all segments are full we spill _all_ current segments in memory to disk into a single file, and re-use this file for future spilling until we stopped spilling.
>
> So, we do not create 1 file per segment, but 1 file per instance of no memory being available. And there should be at most 1 file per subtask at any point.
>
> As such, increasing the segment size shouldn't have any effect; you have fewer segments, but the overall memory usage stays the same, and spilling should occur just as often. More memory probably also will not help, given that we will end up having to spill anyway with such a data volume (I suppose).
>
> Are there possibly other sources for files being created?  (anything in the user-code?)
> Could it be something annoying like the buffers rapidly switching between spilling/reading from memory, creating a new file on each spill, overwhelming the OS?
>
> On 9/17/2020 11:06 PM, Ken Krugler wrote:
>> Hi all,
>>
>> When I do a leftOuterJoin(stream, JoinHint.REPARTITION_SORT_MERGE), I’m running into an IOException caused by too many open files.
>>
>> The slaves in my YARN cluster (each with 48 slots and 320gb memory) are currently set up with a limit of 32767, so I really don’t want to crank this up much higher.
>>
>> In perusing the code, I assume the issue is that SpillingBuffer.nextSegment() can open a writer per segment.
>>
>> So if I have a lot of data (e.g. several TBs) that I’m joining, I can wind up with > 32K segments that are open for writing.
>>
>> Does that make sense? And is the best solution currently (without say using more, smaller servers to reduce the per-server load) to increase the taskmanager.memory.segment-size configuration setting from 32K to something larger, so I’d have fewer active segments?
>>
>> If I do that, any known side-effects I should worry about?
>>
>> Thanks,
>>
>> — Ken
>>
>> PS - a CoGroup is happening at the same time, so it’s possible that’s also hurting (or maybe the real source of the problem), but the leftOuterJoin failed first.
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
>

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr