Re: Out of Memory Error-Heap when storing parquet files using Flink Table API (Flink version-1.12.0) in Google Cloud Storage

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Out of Memory Error-Heap when storing parquet files using Flink Table API (Flink version-1.12.0) in Google Cloud Storage

Sivaraman Venkataraman, Aswin Ram
Hi Everyone,
Hope you are doing well. We are currently using Flink Table API (Flink Version-1.12.0) to stream data from Kafka and store it in Google Cloud Storage. The file format we are using to store data is Parquet. Initially the Flink job worked perfectly fine and we were able to stream data and store it successfully in Google Cloud Storage. But what we noticed is, once we increase the cardinality of input data and also increase the volume of data to Kafka i.e. stream more events per second to Kafka, we noticed that the Flink Job throws the following errors:

1. GC Overlimit Exceeded
2. Java Heap memory Out of Space- Error.

We tried running flink using Kubernetes Cluster and flink on YARN. In both cases, as the volume of data increased, we saw the above errors.
We provided 2 task managers 10 gb each and 1 gb for the job manager. The Checkpoint interval we have for our flink job is 3 minutes. I am aware that there has been a bug filed in the Flink- https://issues.apache.org/jira/browse/FLINK-20945.
Please let me know, if there is a way to solve this issue and when the JIRA Bug FLINK-20945 can be resolved. We are trying to do test run with some of our customers. It’s a production blocker for us.

Regards
Aswin


From: Sivaraman Venkataraman, Aswin Ram <[hidden email]>
Date: Monday, February 15, 2021 at 12:15 AM
To: [hidden email] <[hidden email]>
Cc: Sivaraman Venkataraman, Aswin Ram <[hidden email]>
Subject: Out of Memory Error-Heap when storing parquet files using Flink Table API (Flink version-1.12.0) in Google Cloud Storage
Hi Everyone,
We are currently using Flink Table API (Flink Version-1.12.0) to stream data from Kafka and store it in Google Cloud Storage. The file format we are using to store data is Parquet. Initially the Flink job worked perfectly fine and we were able to stream data and store it successfully in Google Cloud Storage. But what we noticed is, once we increase the cardinality of input data and also increase the speed of data generated to Kafka i.e. stream more events per second to Kafka, we noticed that the Flink Job throws the following errors:

1. GC Overlimit Exceeded
2. Java Heap memory Out of Space- Error.

Initially I provided 4 gb each to Job Manager and Task Manager. I started the flink’s yarn session with the following command:
./bin/yarn-session.sh -jm 4096m -tm 4096m -s 3

One thing we noticed was on increasing the memory provided to Job Manager and Task Manager i.e. I restarted  the yarn session for flink with the following parameters:
./bin/yarn-session.sh -jm 10240m -tm 10240m -s 3

I noticed that, the error was no longer being thrown by the Flink job. The question I have is, other than increasing the JVM Heap size, is there any other way in Flink to prevent this JVM Heap Memory out of space error?.
I am aware that, flink while writing Parquet files, buffers the data in memory before flushing it to disk. So, when we increased the
cardinality of data, it may have led to more data being buffered in memory, thereby causing this error. Additionally, the Checkpoint interval we have for our flink job is 3 minutes.
Please let me know, if you need any more information to further understand the problem.

Regards
Aswin
Reply | Threaded
Open this post in threaded view
|

Re: Out of Memory Error-Heap when storing parquet files using Flink Table API (Flink version-1.12.0) in Google Cloud Storage

Xintong Song
Hi Aswin,

Do you mean that the job purely moves data from Kafka to GCS. Is there any
processings on the data except for reformatting? E.g., aggregation over a
window?

If there's barely any processing, you might want to try the following
things.
- Increase number of concurrent GC threads
- Disable the GC overhead limit

To be specific, try add '-XX:ParallelGCThreads=<num-of-threads>
-XX:-UseGCOverheadLimit' to 'env.java.opts.taskmanager' in your
configurations. As for the value of '<num-of-threads>', I'd suggest 4 or 8.

I'm suggesting this because I suspect the problem is caused by insufficient
GC speed rather than insufficient heap space.
- Usually, for a java program, it is expected that the program's working
threads take most of the cpu time. Therefore, JVM does not create many
threads for GC (by default as many as the cpu cores when there are no more
than 8 cores), and throws OOM error when the GC threads use more time than
the working threads.
- For a Flink job that purely reads in and writes out the data, objects are
frequently created as the input data deserialized, and soon become
unneeded. Consequently, there is more workload for GC than for the working
threads.
- This usually indicates that an object reuse approach is needed to reduce
the GC pressure. However, this is neither the current state nor easy to
achieve for many connectors.
- As far as I can see, the most feasible workaround at the moment is to
leave with the GC pressure and give more cpu time to the GC threads.

Thank you~

Xintong Song



On Fri, Mar 26, 2021 at 6:48 AM Sivaraman Venkataraman, Aswin Ram <
[hidden email]> wrote:

> Hi Everyone,
> Hope you are doing well. We are currently using Flink Table API (Flink
> Version-1.12.0) to stream data from Kafka and store it in Google Cloud
> Storage. The file format we are using to store data is Parquet. Initially
> the Flink job worked perfectly fine and we were able to stream data and
> store it successfully in Google Cloud Storage. But what we noticed is, once
> we increase the cardinality of input data and also increase the volume of
> data to Kafka i.e. stream more events per second to Kafka, we noticed that
> the Flink Job throws the following errors:
>
> 1. GC Overlimit Exceeded
> 2. Java Heap memory Out of Space- Error.
>
> We tried running flink using Kubernetes Cluster and flink on YARN. In both
> cases, as the volume of data increased, we saw the above errors.
> We provided 2 task managers 10 gb each and 1 gb for the job manager. The
> Checkpoint interval we have for our flink job is 3 minutes. I am aware that
> there has been a bug filed in the Flink-
> https://issues.apache.org/jira/browse/FLINK-20945.
> Please let me know, if there is a way to solve this issue and when the
> JIRA Bug FLINK-20945 can be resolved. We are trying to do test run with
> some of our customers. It’s a production blocker for us.
>
> Regards
> Aswin
>
>
> From: Sivaraman Venkataraman, Aswin Ram <
> [hidden email]>
> Date: Monday, February 15, 2021 at 12:15 AM
> To: [hidden email] <[hidden email]>
> Cc: Sivaraman Venkataraman, Aswin Ram <
> [hidden email]>
> Subject: Out of Memory Error-Heap when storing parquet files using Flink
> Table API (Flink version-1.12.0) in Google Cloud Storage
> Hi Everyone,
> We are currently using Flink Table API (Flink Version-1.12.0) to stream
> data from Kafka and store it in Google Cloud Storage. The file format we
> are using to store data is Parquet. Initially the Flink job worked
> perfectly fine and we were able to stream data and store it successfully in
> Google Cloud Storage. But what we noticed is, once we increase the
> cardinality of input data and also increase the speed of data generated to
> Kafka i.e. stream more events per second to Kafka, we noticed that the
> Flink Job throws the following errors:
>
> 1. GC Overlimit Exceeded
> 2. Java Heap memory Out of Space- Error.
>
> Initially I provided 4 gb each to Job Manager and Task Manager. I started
> the flink’s yarn session with the following command:
> ./bin/yarn-session.sh -jm 4096m -tm 4096m -s 3
>
> One thing we noticed was on increasing the memory provided to Job Manager
> and Task Manager i.e. I restarted  the yarn session for flink with the
> following parameters:
> ./bin/yarn-session.sh -jm 10240m -tm 10240m -s 3
>
> I noticed that, the error was no longer being thrown by the Flink job. The
> question I have is, other than increasing the JVM Heap size, is there any
> other way in Flink to prevent this JVM Heap Memory out of space error?.
> I am aware that, flink while writing Parquet files, buffers the data in
> memory before flushing it to disk. So, when we increased the
> cardinality of data, it may have led to more data being buffered in
> memory, thereby causing this error. Additionally, the Checkpoint interval
> we have for our flink job is 3 minutes.
> Please let me know, if you need any more information to further understand
> the problem.
>
> Regards
> Aswin
>