Hi,
My flink jobs are constantly going down beyond an hour with the below exception. This is Flink 1.7 on kubes, with checkpoints to Google storage. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.OutOfMemoryError: Java heap space at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) ... 4 more Any help here in understanding this would be highly appreciated. Thanks. |
Hi Ramya,
it looks as if you should give your Flink pods and also the Flink process a bit more memory as the process fails with an out of memory error. You could also try Flink's latest version which comes with native Kubernetes support. Cheers, Till On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> wrote: > Hi, > > My flink jobs are constantly going down beyond an hour with the below > exception. > This is Flink 1.7 on kubes, with checkpoints to Google storage. > > AsynchronousException{java.lang.Exception: Could not materialize > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 21 > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts) > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts) > -> time attribute: (ts) (5/6). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.OutOfMemoryError: Java heap space > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: java.lang.OutOfMemoryError: Java heap space > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) > ... 4 more > > > > Any help here in understanding this would be highly appreciated. > > > Thanks. > |
Thanks Till. Actually, i have around 5GB pods for each TM, and each pod with only one slot. But the metrics i have pulled is as below, which is slightly confusing. It says only ~50MB of Heap is committed for the tasks. Would you be able to point me to the right configuration to be set. Thanks ~Ramya. On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> wrote: Hi Ramya, |
Hi Ramya,
Increasing the memory of your pod will not give you more JVM heap space. You will need to configure Flink so it launches the JVM process with more memory. In Flink 1.7, this could be achieved by configuring 'jobmanager.heap.size' & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by default 1024m. Please also note that, you should not configure these two options two as large as your Kubernetes pod. Because Flink may also have some off-heap memory overhead, so the total memory consumed by the Flink processes might be larger than configured. This may cause your pods getting killed by Kubernetes due to memory exceeding. According to our experience, leaving around 20~25% of your pod memory for such overhead might be a good practice. In your case, that means configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your workload, you may need to further increase the off-heap memory size. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> wrote: > Thanks Till. > Actually, i have around 5GB pods for each TM, and each pod with only one > slot. > But the metrics i have pulled is as below, which is slightly confusing. > It says only ~50MB of Heap is committed for the tasks. Would you be able > to point me to the right configuration to be set. > > Thanks > ~Ramya. > > [image: image.png] > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> wrote: > >> Hi Ramya, >> >> it looks as if you should give your Flink pods and also the Flink process >> a >> bit more memory as the process fails with an out of memory error. You >> could >> also try Flink's latest version which comes with native Kubernetes >> support. >> >> Cheers, >> Till >> >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> >> wrote: >> >> > Hi, >> > >> > My flink jobs are constantly going down beyond an hour with the below >> > exception. >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. >> > >> > AsynchronousException{java.lang.Exception: Could not materialize >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} >> > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) >> > at >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > at >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > at java.lang.Thread.run(Thread.java:748) >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21 >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts) >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts) >> > -> time attribute: (ts) (5/6). >> > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) >> > ... 6 more >> > Caused by: java.util.concurrent.ExecutionException: >> > java.lang.OutOfMemoryError: Java heap space >> > at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> > at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> > at >> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) >> > at >> > >> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) >> > at >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >> > ... 5 more >> > Caused by: java.lang.OutOfMemoryError: Java heap space >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) >> > at >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) >> > ... 4 more >> > >> > >> > >> > Any help here in understanding this would be highly appreciated. >> > >> > >> > Thanks. >> > >> > |
Hi Xintong,
Thanks for the quick response. I have kept my task manager memory to be 1.5GB. But still seeing the Heap committed metric to be around 54MB or so. Why does this happen ? Should I configure any memory fraction configurations here ? Thanks. On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> wrote: > Hi Ramya, > > Increasing the memory of your pod will not give you more JVM heap space. > You will need to configure Flink so it launches the JVM process with more > memory. > > In Flink 1.7, this could be achieved by configuring 'jobmanager.heap.size' > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by > default 1024m. > > Please also note that, you should not configure these two options two as > large as your Kubernetes pod. Because Flink may also have some off-heap > memory overhead, so the total memory consumed by the Flink processes might > be larger than configured. This may cause your pods getting killed by > Kubernetes due to memory exceeding. > > According to our experience, leaving around 20~25% of your pod memory for > such overhead might be a good practice. In your case, that means > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your > workload, you may need to further increase the off-heap memory size. > > Thank you~ > > Xintong Song > > > > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> > wrote: > > > Thanks Till. > > Actually, i have around 5GB pods for each TM, and each pod with only one > > slot. > > But the metrics i have pulled is as below, which is slightly confusing. > > It says only ~50MB of Heap is committed for the tasks. Would you be able > > to point me to the right configuration to be set. > > > > Thanks > > ~Ramya. > > > > [image: image.png] > > > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> > wrote: > > > >> Hi Ramya, > >> > >> it looks as if you should give your Flink pods and also the Flink > process > >> a > >> bit more memory as the process fails with an out of memory error. You > >> could > >> also try Flink's latest version which comes with native Kubernetes > >> support. > >> > >> Cheers, > >> Till > >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> > >> wrote: > >> > >> > Hi, > >> > > >> > My flink jobs are constantly going down beyond an hour with the below > >> > exception. > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. > >> > > >> > AsynchronousException{java.lang.Exception: Could not materialize > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > >> > at > >> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> > at > >> > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > at > >> > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > at java.lang.Thread.run(Thread.java:748) > >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21 > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts) > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts) > >> > -> time attribute: (ts) (5/6). > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > >> > ... 6 more > >> > Caused by: java.util.concurrent.ExecutionException: > >> > java.lang.OutOfMemoryError: Java heap space > >> > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > >> > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > >> > at > >> > > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > >> > at > >> > > >> > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > >> > at > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > >> > ... 5 more > >> > Caused by: java.lang.OutOfMemoryError: Java heap space > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) > >> > at > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) > >> > ... 4 more > >> > > >> > > >> > > >> > Any help here in understanding this would be highly appreciated. > >> > > >> > > >> > Thanks. > >> > > >> > > > |
Do you still run into the "java.lang.OutOfMemoryError: Java heap space"?
If not, then you don't really need to worry about the committed memory. It is the maximum that really matters. The committed memory should increase automatically when it's needed. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]> wrote: > Hi Xintong, > > Thanks for the quick response. > > I have kept my task manager memory to be 1.5GB. But still seeing the Heap > committed metric to be around 54MB or so. Why does this happen ? Should I > configure any memory fraction configurations here ? > > Thanks. > > On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> > wrote: > > > Hi Ramya, > > > > Increasing the memory of your pod will not give you more JVM heap space. > > You will need to configure Flink so it launches the JVM process with more > > memory. > > > > In Flink 1.7, this could be achieved by configuring > 'jobmanager.heap.size' > > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by > > default 1024m. > > > > Please also note that, you should not configure these two options two as > > large as your Kubernetes pod. Because Flink may also have some off-heap > > memory overhead, so the total memory consumed by the Flink processes > might > > be larger than configured. This may cause your pods getting killed by > > Kubernetes due to memory exceeding. > > > > According to our experience, leaving around 20~25% of your pod memory for > > such overhead might be a good practice. In your case, that means > > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your > > workload, you may need to further increase the off-heap memory size. > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> > > wrote: > > > > > Thanks Till. > > > Actually, i have around 5GB pods for each TM, and each pod with only > one > > > slot. > > > But the metrics i have pulled is as below, which is slightly confusing. > > > It says only ~50MB of Heap is committed for the tasks. Would you be > able > > > to point me to the right configuration to be set. > > > > > > Thanks > > > ~Ramya. > > > > > > [image: image.png] > > > > > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> > > wrote: > > > > > >> Hi Ramya, > > >> > > >> it looks as if you should give your Flink pods and also the Flink > > process > > >> a > > >> bit more memory as the process fails with an out of memory error. You > > >> could > > >> also try Flink's latest version which comes with native Kubernetes > > >> support. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> > > >> wrote: > > >> > > >> > Hi, > > >> > > > >> > My flink jobs are constantly going down beyond an hour with the > below > > >> > exception. > > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. > > >> > > > >> > AsynchronousException{java.lang.Exception: Could not materialize > > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3, > > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, > > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> > > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: > > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > > >> > at > > >> > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > >> > at > > >> > > > >> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > >> > at > > >> > > > >> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > >> > at java.lang.Thread.run(Thread.java:748) > > >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21 > > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, > > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, > ts) > > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> > > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, > ts) > > >> > -> time attribute: (ts) (5/6). > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > > >> > ... 6 more > > >> > Caused by: java.util.concurrent.ExecutionException: > > >> > java.lang.OutOfMemoryError: Java heap space > > >> > at > java.util.concurrent.FutureTask.report(FutureTask.java:122) > > >> > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > > >> > at > > >> > > > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > > >> > at > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > > >> > ... 5 more > > >> > Caused by: java.lang.OutOfMemoryError: Java heap space > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) > > >> > at > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) > > >> > ... 4 more > > >> > > > >> > > > >> > > > >> > Any help here in understanding this would be highly appreciated. > > >> > > > >> > > > >> > Thanks. > > >> > > > >> > > > > > > |
BTW, the image you previously attached cannot be displayed. So I assume you
are talking about the "Heap Committed" displayed on Flink's webui? Thank you~ Xintong Song On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]> wrote: > Do you still run into the "java.lang.OutOfMemoryError: Java heap space"? > > If not, then you don't really need to worry about the committed memory. > > It is the maximum that really matters. The committed memory should > increase automatically when it's needed. > > Thank you~ > > Xintong Song > > > > On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]> > wrote: > >> Hi Xintong, >> >> Thanks for the quick response. >> >> I have kept my task manager memory to be 1.5GB. But still seeing the Heap >> committed metric to be around 54MB or so. Why does this happen ? Should I >> configure any memory fraction configurations here ? >> >> Thanks. >> >> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> >> wrote: >> >> > Hi Ramya, >> > >> > Increasing the memory of your pod will not give you more JVM heap space. >> > You will need to configure Flink so it launches the JVM process with >> more >> > memory. >> > >> > In Flink 1.7, this could be achieved by configuring >> 'jobmanager.heap.size' >> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by >> > default 1024m. >> > >> > Please also note that, you should not configure these two options two as >> > large as your Kubernetes pod. Because Flink may also have some off-heap >> > memory overhead, so the total memory consumed by the Flink processes >> might >> > be larger than configured. This may cause your pods getting killed by >> > Kubernetes due to memory exceeding. >> > >> > According to our experience, leaving around 20~25% of your pod memory >> for >> > such overhead might be a good practice. In your case, that means >> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your >> > workload, you may need to further increase the off-heap memory size. >> > >> > Thank you~ >> > >> > Xintong Song >> > >> > >> > >> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> >> > wrote: >> > >> > > Thanks Till. >> > > Actually, i have around 5GB pods for each TM, and each pod with only >> one >> > > slot. >> > > But the metrics i have pulled is as below, which is slightly >> confusing. >> > > It says only ~50MB of Heap is committed for the tasks. Would you be >> able >> > > to point me to the right configuration to be set. >> > > >> > > Thanks >> > > ~Ramya. >> > > >> > > [image: image.png] >> > > >> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> >> > wrote: >> > > >> > >> Hi Ramya, >> > >> >> > >> it looks as if you should give your Flink pods and also the Flink >> > process >> > >> a >> > >> bit more memory as the process fails with an out of memory error. You >> > >> could >> > >> also try Flink's latest version which comes with native Kubernetes >> > >> support. >> > >> >> > >> Cheers, >> > >> Till >> > >> >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> >> > >> wrote: >> > >> >> > >> > Hi, >> > >> > >> > >> > My flink jobs are constantly going down beyond an hour with the >> below >> > >> > exception. >> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. >> > >> > >> > >> > AsynchronousException{java.lang.Exception: Could not materialize >> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, >> _zpsbd3, >> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, >> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> >> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select: >> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) >> > >> > at >> > >> > >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > >> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> > >> > at >> > >> > >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > >> > at >> > >> > >> > >> >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > >> > at java.lang.Thread.run(Thread.java:748) >> > >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21 >> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, >> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, >> ts) >> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> >> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, >> ts) >> > >> > -> time attribute: (ts) (5/6). >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) >> > >> > ... 6 more >> > >> > Caused by: java.util.concurrent.ExecutionException: >> > >> > java.lang.OutOfMemoryError: Java heap space >> > >> > at >> java.util.concurrent.FutureTask.report(FutureTask.java:122) >> > >> > at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> > >> > at >> > >> > >> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) >> > >> > at >> > >> > >> > >> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >> > >> > ... 5 more >> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) >> > >> > at >> > >> > >> > >> >> > >> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) >> > >> > ... 4 more >> > >> > >> > >> > >> > >> > >> > >> > Any help here in understanding this would be highly appreciated. >> > >> > >> > >> > >> > >> > Thanks. >> > >> > >> > >> >> > > >> > >> > |
Yes ... the image was on Heap Committed metrics.
And i have not yet faced this issue now, post changing the memory. I seem to get one more frequent error: org.apache.flink.util.FlinkException: The assigned slot d9d4db5cc747bcbd374888d97e81945b_0 was removed. When are we likely to get this ?? Thanks, ~Ramya. On Fri, Jun 12, 2020 at 12:03 PM Xintong Song <[hidden email]> wrote: > BTW, the image you previously attached cannot be displayed. So I assume you > are talking about the "Heap Committed" displayed on Flink's webui? > > Thank you~ > > Xintong Song > > > > On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]> > wrote: > > > Do you still run into the "java.lang.OutOfMemoryError: Java heap space"? > > > > If not, then you don't really need to worry about the committed memory. > > > > It is the maximum that really matters. The committed memory should > > increase automatically when it's needed. > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]> > > wrote: > > > >> Hi Xintong, > >> > >> Thanks for the quick response. > >> > >> I have kept my task manager memory to be 1.5GB. But still seeing the > Heap > >> committed metric to be around 54MB or so. Why does this happen ? Should > I > >> configure any memory fraction configurations here ? > >> > >> Thanks. > >> > >> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> > >> wrote: > >> > >> > Hi Ramya, > >> > > >> > Increasing the memory of your pod will not give you more JVM heap > space. > >> > You will need to configure Flink so it launches the JVM process with > >> more > >> > memory. > >> > > >> > In Flink 1.7, this could be achieved by configuring > >> 'jobmanager.heap.size' > >> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are > by > >> > default 1024m. > >> > > >> > Please also note that, you should not configure these two options two > as > >> > large as your Kubernetes pod. Because Flink may also have some > off-heap > >> > memory overhead, so the total memory consumed by the Flink processes > >> might > >> > be larger than configured. This may cause your pods getting killed by > >> > Kubernetes due to memory exceeding. > >> > > >> > According to our experience, leaving around 20~25% of your pod memory > >> for > >> > such overhead might be a good practice. In your case, that means > >> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your > >> > workload, you may need to further increase the off-heap memory size. > >> > > >> > Thank you~ > >> > > >> > Xintong Song > >> > > >> > > >> > > >> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> > >> > wrote: > >> > > >> > > Thanks Till. > >> > > Actually, i have around 5GB pods for each TM, and each pod with only > >> one > >> > > slot. > >> > > But the metrics i have pulled is as below, which is slightly > >> confusing. > >> > > It says only ~50MB of Heap is committed for the tasks. Would you be > >> able > >> > > to point me to the right configuration to be set. > >> > > > >> > > Thanks > >> > > ~Ramya. > >> > > > >> > > [image: image.png] > >> > > > >> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> > >> > wrote: > >> > > > >> > >> Hi Ramya, > >> > >> > >> > >> it looks as if you should give your Flink pods and also the Flink > >> > process > >> > >> a > >> > >> bit more memory as the process fails with an out of memory error. > You > >> > >> could > >> > >> also try Flink's latest version which comes with native Kubernetes > >> > >> support. > >> > >> > >> > >> Cheers, > >> > >> Till > >> > >> > >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email] > > > >> > >> wrote: > >> > >> > >> > >> > Hi, > >> > >> > > >> > >> > My flink jobs are constantly going down beyond an hour with the > >> below > >> > >> > exception. > >> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. > >> > >> > > >> > >> > AsynchronousException{java.lang.Exception: Could not materialize > >> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, > >> _zpsbd3, > >> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, > ssresp, > >> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> > >> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), > select: > >> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > >> > >> > at > >> > >> > > >> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> > >> > at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > >> > at java.lang.Thread.run(Thread.java:748) > >> > >> > Caused by: java.lang.Exception: Could not materialize checkpoint > 21 > >> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, > >> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, > >> ts) > >> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks -> > >> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, > _zpsbd6, > >> ts) > >> > >> > -> time attribute: (ts) (5/6). > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > >> > >> > ... 6 more > >> > >> > Caused by: java.util.concurrent.ExecutionException: > >> > >> > java.lang.OutOfMemoryError: Java heap space > >> > >> > at > >> java.util.concurrent.FutureTask.report(FutureTask.java:122) > >> > >> > at > java.util.concurrent.FutureTask.get(FutureTask.java:192) > >> > >> > at > >> > >> > > >> > > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > >> > >> > ... 5 more > >> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) > >> > >> > at > >> > >> > > >> > >> > >> > > >> > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) > >> > >> > ... 4 more > >> > >> > > >> > >> > > >> > >> > > >> > >> > Any help here in understanding this would be highly appreciated. > >> > >> > > >> > >> > > >> > >> > Thanks. > >> > >> > > >> > >> > >> > > > >> > > >> > > > |
Usually that means the remote task manager, where the slot locates, is lost.
You will need to look into the log of that task manager to find out what's wrong with it. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 4:13 PM Ramya Ramamurthy <[hidden email]> wrote: > Yes ... the image was on Heap Committed metrics. > And i have not yet faced this issue now, post changing the memory. > > I seem to get one more frequent error: > org.apache.flink.util.FlinkException: > The assigned slot d9d4db5cc747bcbd374888d97e81945b_0 was removed. > > When are we likely to get this ?? > > Thanks, > > ~Ramya. > > > On Fri, Jun 12, 2020 at 12:03 PM Xintong Song <[hidden email]> > wrote: > > > BTW, the image you previously attached cannot be displayed. So I assume > you > > are talking about the "Heap Committed" displayed on Flink's webui? > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]> > > wrote: > > > > > Do you still run into the "java.lang.OutOfMemoryError: Java heap > space"? > > > > > > If not, then you don't really need to worry about the committed memory. > > > > > > It is the maximum that really matters. The committed memory should > > > increase automatically when it's needed. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]> > > > wrote: > > > > > >> Hi Xintong, > > >> > > >> Thanks for the quick response. > > >> > > >> I have kept my task manager memory to be 1.5GB. But still seeing the > > Heap > > >> committed metric to be around 54MB or so. Why does this happen ? > Should > > I > > >> configure any memory fraction configurations here ? > > >> > > >> Thanks. > > >> > > >> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> > > >> wrote: > > >> > > >> > Hi Ramya, > > >> > > > >> > Increasing the memory of your pod will not give you more JVM heap > > space. > > >> > You will need to configure Flink so it launches the JVM process with > > >> more > > >> > memory. > > >> > > > >> > In Flink 1.7, this could be achieved by configuring > > >> 'jobmanager.heap.size' > > >> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them > are > > by > > >> > default 1024m. > > >> > > > >> > Please also note that, you should not configure these two options > two > > as > > >> > large as your Kubernetes pod. Because Flink may also have some > > off-heap > > >> > memory overhead, so the total memory consumed by the Flink processes > > >> might > > >> > be larger than configured. This may cause your pods getting killed > by > > >> > Kubernetes due to memory exceeding. > > >> > > > >> > According to our experience, leaving around 20~25% of your pod > memory > > >> for > > >> > such overhead might be a good practice. In your case, that means > > >> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in > your > > >> > workload, you may need to further increase the off-heap memory size. > > >> > > > >> > Thank you~ > > >> > > > >> > Xintong Song > > >> > > > >> > > > >> > > > >> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email] > > > > >> > wrote: > > >> > > > >> > > Thanks Till. > > >> > > Actually, i have around 5GB pods for each TM, and each pod with > only > > >> one > > >> > > slot. > > >> > > But the metrics i have pulled is as below, which is slightly > > >> confusing. > > >> > > It says only ~50MB of Heap is committed for the tasks. Would you > be > > >> able > > >> > > to point me to the right configuration to be set. > > >> > > > > >> > > Thanks > > >> > > ~Ramya. > > >> > > > > >> > > [image: image.png] > > >> > > > > >> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann < > [hidden email]> > > >> > wrote: > > >> > > > > >> > >> Hi Ramya, > > >> > >> > > >> > >> it looks as if you should give your Flink pods and also the Flink > > >> > process > > >> > >> a > > >> > >> bit more memory as the process fails with an out of memory error. > > You > > >> > >> could > > >> > >> also try Flink's latest version which comes with native > Kubernetes > > >> > >> support. > > >> > >> > > >> > >> Cheers, > > >> > >> Till > > >> > >> > > >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy < > [hidden email] > > > > > >> > >> wrote: > > >> > >> > > >> > >> > Hi, > > >> > >> > > > >> > >> > My flink jobs are constantly going down beyond an hour with the > > >> below > > >> > >> > exception. > > >> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage. > > >> > >> > > > >> > >> > AsynchronousException{java.lang.Exception: Could not > materialize > > >> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, > > >> _zpsbd3, > > >> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, > > ssresp, > > >> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) -> > > >> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), > > select: > > >> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).} > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > > >> > >> > at > > >> > >> > > > >> > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > >> > >> > at > > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > >> > >> > at java.lang.Thread.run(Thread.java:748) > > >> > >> > Caused by: java.lang.Exception: Could not materialize > checkpoint > > 21 > > >> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4, > > >> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, > reason, > > >> ts) > > >> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks > -> > > >> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, > > _zpsbd6, > > >> ts) > > >> > >> > -> time attribute: (ts) (5/6). > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > > >> > >> > ... 6 more > > >> > >> > Caused by: java.util.concurrent.ExecutionException: > > >> > >> > java.lang.OutOfMemoryError: Java heap space > > >> > >> > at > > >> java.util.concurrent.FutureTask.report(FutureTask.java:122) > > >> > >> > at > > java.util.concurrent.FutureTask.get(FutureTask.java:192) > > >> > >> > at > > >> > >> > > > >> > > > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > > >> > >> > ... 5 more > > >> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599) > > >> > >> > at > > >> > >> > > > >> > >> > > >> > > > >> > > > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272) > > >> > >> > ... 4 more > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > Any help here in understanding this would be highly > appreciated. > > >> > >> > > > >> > >> > > > >> > >> > Thanks. > > >> > >> > > > >> > >> > > >> > > > > >> > > > >> > > > > > > |
Free forum by Nabble | Edit this page |