Flink on Kubes -- issues

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on Kubes -- issues

Ramya Ramamurthy
Hi,

My flink jobs are constantly going down beyond an hour with the below
exception.
This is Flink 1.7 on kubes, with checkpoints to Google storage.

AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
_zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
(sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 21
for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
_zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
-> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
-> time attribute: (ts) (5/6).
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: Java heap space
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
        ... 5 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
        at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
        ... 4 more



Any help here in understanding this would be highly appreciated.


Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Till Rohrmann
Hi Ramya,

it looks as if you should give your Flink pods and also the Flink process a
bit more memory as the process fails with an out of memory error. You could
also try Flink's latest version which comes with native Kubernetes support.

Cheers,
Till

On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> wrote:

> Hi,
>
> My flink jobs are constantly going down beyond an hour with the below
> exception.
> This is Flink 1.7 on kubes, with checkpoints to Google storage.
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
> _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
> reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
> (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 21
> for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
> -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
> where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
> -> time attribute: (ts) (5/6).
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>         ... 5 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
>         ... 4 more
>
>
>
> Any help here in understanding this would be highly appreciated.
>
>
> Thanks.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Ramya Ramamurthy
Thanks Till.
Actually, i have around 5GB pods for each TM, and each pod with only one slot.
But the metrics i have pulled is as below, which is slightly confusing. 
It says only ~50MB of Heap is committed for the tasks. Would you be able to point me to the right configuration to be set.

Thanks
~Ramya.



On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> wrote:
Hi Ramya,

it looks as if you should give your Flink pods and also the Flink process a
bit more memory as the process fails with an out of memory error. You could
also try Flink's latest version which comes with native Kubernetes support.

Cheers,
Till

On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]> wrote:

> Hi,
>
> My flink jobs are constantly going down beyond an hour with the below
> exception.
> This is Flink 1.7 on kubes, with checkpoints to Google storage.
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
> _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
> reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
> (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 21
> for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
> -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
> where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
> -> time attribute: (ts) (5/6).
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>         ... 5 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
>         at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
>         ... 4 more
>
>
>
> Any help here in understanding this would be highly appreciated.
>
>
> Thanks.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Xintong Song
Hi Ramya,

Increasing the memory of your pod will not give you more JVM heap space.
You will need to configure Flink so it launches the JVM process with more
memory.

In Flink 1.7, this could be achieved by configuring 'jobmanager.heap.size'
& 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by
default 1024m.

Please also note that, you should not configure these two options two as
large as your Kubernetes pod. Because Flink may also have some off-heap
memory overhead, so the total memory consumed by the Flink processes might
be larger than configured. This may cause your pods getting killed by
Kubernetes due to memory exceeding.

According to our experience, leaving around 20~25% of your pod memory for
such overhead might be a good practice. In your case, that means
configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
workload, you may need to further increase the off-heap memory size.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]> wrote:

> Thanks Till.
> Actually, i have around 5GB pods for each TM, and each pod with only one
> slot.
> But the metrics i have pulled is as below, which is slightly confusing.
> It says only ~50MB of Heap is committed for the tasks. Would you be able
> to point me to the right configuration to be set.
>
> Thanks
> ~Ramya.
>
> [image: image.png]
>
> On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]> wrote:
>
>> Hi Ramya,
>>
>> it looks as if you should give your Flink pods and also the Flink process
>> a
>> bit more memory as the process fails with an out of memory error. You
>> could
>> also try Flink's latest version which comes with native Kubernetes
>> support.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]>
>> wrote:
>>
>> > Hi,
>> >
>> > My flink jobs are constantly going down beyond an hour with the below
>> > exception.
>> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
>> >
>> > AsynchronousException{java.lang.Exception: Could not materialize
>> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
>> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
>> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
>> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
>> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> >         at
>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >         at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >         at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >         at java.lang.Thread.run(Thread.java:748)
>> > Caused by: java.lang.Exception: Could not materialize checkpoint 21
>> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
>> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
>> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
>> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
>> > -> time attribute: (ts) (5/6).
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> >         ... 6 more
>> > Caused by: java.util.concurrent.ExecutionException:
>> > java.lang.OutOfMemoryError: Java heap space
>> >         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> >         at
>> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>> >         at
>> >
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>> >         at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> >         ... 5 more
>> > Caused by: java.lang.OutOfMemoryError: Java heap space
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
>> >         at
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
>> >         ... 4 more
>> >
>> >
>> >
>> > Any help here in understanding this would be highly appreciated.
>> >
>> >
>> > Thanks.
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Ramya Ramamurthy
Hi Xintong,

Thanks for the quick response.

I have kept my task manager memory to be 1.5GB. But still seeing the Heap
committed metric to be around 54MB or so. Why does this happen ? Should I
configure any memory fraction configurations here ?

Thanks.

On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]> wrote:

> Hi Ramya,
>
> Increasing the memory of your pod will not give you more JVM heap space.
> You will need to configure Flink so it launches the JVM process with more
> memory.
>
> In Flink 1.7, this could be achieved by configuring 'jobmanager.heap.size'
> & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by
> default 1024m.
>
> Please also note that, you should not configure these two options two as
> large as your Kubernetes pod. Because Flink may also have some off-heap
> memory overhead, so the total memory consumed by the Flink processes might
> be larger than configured. This may cause your pods getting killed by
> Kubernetes due to memory exceeding.
>
> According to our experience, leaving around 20~25% of your pod memory for
> such overhead might be a good practice. In your case, that means
> configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
> workload, you may need to further increase the off-heap memory size.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]>
> wrote:
>
> > Thanks Till.
> > Actually, i have around 5GB pods for each TM, and each pod with only one
> > slot.
> > But the metrics i have pulled is as below, which is slightly confusing.
> > It says only ~50MB of Heap is committed for the tasks. Would you be able
> > to point me to the right configuration to be set.
> >
> > Thanks
> > ~Ramya.
> >
> > [image: image.png]
> >
> > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]>
> wrote:
> >
> >> Hi Ramya,
> >>
> >> it looks as if you should give your Flink pods and also the Flink
> process
> >> a
> >> bit more memory as the process fails with an out of memory error. You
> >> could
> >> also try Flink's latest version which comes with native Kubernetes
> >> support.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > My flink jobs are constantly going down beyond an hour with the below
> >> > exception.
> >> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
> >> >
> >> > AsynchronousException{java.lang.Exception: Could not materialize
> >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
> >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
> >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
> >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> >         at
> >> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >         at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> >         at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> >         at java.lang.Thread.run(Thread.java:748)
> >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21
> >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason, ts)
> >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
> >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6, ts)
> >> > -> time attribute: (ts) (5/6).
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >> >         ... 6 more
> >> > Caused by: java.util.concurrent.ExecutionException:
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >> >         at
> >> >
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >> >         at
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >> >         ... 5 more
> >> > Caused by: java.lang.OutOfMemoryError: Java heap space
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
> >> >         at
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
> >> >         ... 4 more
> >> >
> >> >
> >> >
> >> > Any help here in understanding this would be highly appreciated.
> >> >
> >> >
> >> > Thanks.
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Xintong Song
Do you still run into the "java.lang.OutOfMemoryError: Java heap space"?

If not, then you don't really need to worry about the committed memory.

It is the maximum that really matters. The committed memory should increase
automatically when it's needed.

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]> wrote:

> Hi Xintong,
>
> Thanks for the quick response.
>
> I have kept my task manager memory to be 1.5GB. But still seeing the Heap
> committed metric to be around 54MB or so. Why does this happen ? Should I
> configure any memory fraction configurations here ?
>
> Thanks.
>
> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]>
> wrote:
>
> > Hi Ramya,
> >
> > Increasing the memory of your pod will not give you more JVM heap space.
> > You will need to configure Flink so it launches the JVM process with more
> > memory.
> >
> > In Flink 1.7, this could be achieved by configuring
> 'jobmanager.heap.size'
> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by
> > default 1024m.
> >
> > Please also note that, you should not configure these two options two as
> > large as your Kubernetes pod. Because Flink may also have some off-heap
> > memory overhead, so the total memory consumed by the Flink processes
> might
> > be larger than configured. This may cause your pods getting killed by
> > Kubernetes due to memory exceeding.
> >
> > According to our experience, leaving around 20~25% of your pod memory for
> > such overhead might be a good practice. In your case, that means
> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
> > workload, you may need to further increase the off-heap memory size.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]>
> > wrote:
> >
> > > Thanks Till.
> > > Actually, i have around 5GB pods for each TM, and each pod with only
> one
> > > slot.
> > > But the metrics i have pulled is as below, which is slightly confusing.
> > > It says only ~50MB of Heap is committed for the tasks. Would you be
> able
> > > to point me to the right configuration to be set.
> > >
> > > Thanks
> > > ~Ramya.
> > >
> > > [image: image.png]
> > >
> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]>
> > wrote:
> > >
> > >> Hi Ramya,
> > >>
> > >> it looks as if you should give your Flink pods and also the Flink
> > process
> > >> a
> > >> bit more memory as the process fails with an out of memory error. You
> > >> could
> > >> also try Flink's latest version which comes with native Kubernetes
> > >> support.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]>
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > My flink jobs are constantly going down beyond an hour with the
> below
> > >> > exception.
> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
> > >> >
> > >> > AsynchronousException{java.lang.Exception: Could not materialize
> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid, _zpsbd3,
> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > >> >         at
> > >> >
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >> >         at
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > >> >         at
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > >> >         at java.lang.Thread.run(Thread.java:748)
> > >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21
> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason,
> ts)
> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6,
> ts)
> > >> > -> time attribute: (ts) (5/6).
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > >> >         ... 6 more
> > >> > Caused by: java.util.concurrent.ExecutionException:
> > >> > java.lang.OutOfMemoryError: Java heap space
> > >> >         at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > >> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > >> >         at
> > >> >
> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > >> >         at
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > >> >         ... 5 more
> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
> > >> >         at
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
> > >> >         ... 4 more
> > >> >
> > >> >
> > >> >
> > >> > Any help here in understanding this would be highly appreciated.
> > >> >
> > >> >
> > >> > Thanks.
> > >> >
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Xintong Song
BTW, the image you previously attached cannot be displayed. So I assume you
are talking about the "Heap Committed" displayed on Flink's webui?

Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]> wrote:

> Do you still run into the "java.lang.OutOfMemoryError: Java heap space"?
>
> If not, then you don't really need to worry about the committed memory.
>
> It is the maximum that really matters. The committed memory should
> increase automatically when it's needed.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]>
> wrote:
>
>> Hi Xintong,
>>
>> Thanks for the quick response.
>>
>> I have kept my task manager memory to be 1.5GB. But still seeing the Heap
>> committed metric to be around 54MB or so. Why does this happen ? Should I
>> configure any memory fraction configurations here ?
>>
>> Thanks.
>>
>> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]>
>> wrote:
>>
>> > Hi Ramya,
>> >
>> > Increasing the memory of your pod will not give you more JVM heap space.
>> > You will need to configure Flink so it launches the JVM process with
>> more
>> > memory.
>> >
>> > In Flink 1.7, this could be achieved by configuring
>> 'jobmanager.heap.size'
>> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are by
>> > default 1024m.
>> >
>> > Please also note that, you should not configure these two options two as
>> > large as your Kubernetes pod. Because Flink may also have some off-heap
>> > memory overhead, so the total memory consumed by the Flink processes
>> might
>> > be larger than configured. This may cause your pods getting killed by
>> > Kubernetes due to memory exceeding.
>> >
>> > According to our experience, leaving around 20~25% of your pod memory
>> for
>> > such overhead might be a good practice. In your case, that means
>> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
>> > workload, you may need to further increase the off-heap memory size.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]>
>> > wrote:
>> >
>> > > Thanks Till.
>> > > Actually, i have around 5GB pods for each TM, and each pod with only
>> one
>> > > slot.
>> > > But the metrics i have pulled is as below, which is slightly
>> confusing.
>> > > It says only ~50MB of Heap is committed for the tasks. Would you be
>> able
>> > > to point me to the right configuration to be set.
>> > >
>> > > Thanks
>> > > ~Ramya.
>> > >
>> > > [image: image.png]
>> > >
>> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]>
>> > wrote:
>> > >
>> > >> Hi Ramya,
>> > >>
>> > >> it looks as if you should give your Flink pods and also the Flink
>> > process
>> > >> a
>> > >> bit more memory as the process fails with an out of memory error. You
>> > >> could
>> > >> also try Flink's latest version which comes with native Kubernetes
>> > >> support.
>> > >>
>> > >> Cheers,
>> > >> Till
>> > >>
>> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]>
>> > >> wrote:
>> > >>
>> > >> > Hi,
>> > >> >
>> > >> > My flink jobs are constantly going down beyond an hour with the
>> below
>> > >> > exception.
>> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
>> > >> >
>> > >> > AsynchronousException{java.lang.Exception: Could not materialize
>> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid,
>> _zpsbd3,
>> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
>> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
>> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')), select:
>> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>> > >> >         at
>> > >> >
>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > >> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > >> >         at java.lang.Thread.run(Thread.java:748)
>> > >> > Caused by: java.lang.Exception: Could not materialize checkpoint 21
>> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
>> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason,
>> ts)
>> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
>> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3, _zpsbd6,
>> ts)
>> > >> > -> time attribute: (ts) (5/6).
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>> > >> >         ... 6 more
>> > >> > Caused by: java.util.concurrent.ExecutionException:
>> > >> > java.lang.OutOfMemoryError: Java heap space
>> > >> >         at
>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> > >> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> > >> >         at
>> > >> >
>> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>> > >> >         ... 5 more
>> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
>> > >> >         at
>> > >> >
>> > >>
>> >
>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
>> > >> >         ... 4 more
>> > >> >
>> > >> >
>> > >> >
>> > >> > Any help here in understanding this would be highly appreciated.
>> > >> >
>> > >> >
>> > >> > Thanks.
>> > >> >
>> > >>
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Ramya Ramamurthy
Yes ... the image was on Heap Committed metrics.
And i have not yet faced this issue now, post changing the memory.

I seem to get one more frequent error: org.apache.flink.util.FlinkException:
The assigned slot d9d4db5cc747bcbd374888d97e81945b_0 was removed.

When are we likely to get this ??

Thanks,

~Ramya.


On Fri, Jun 12, 2020 at 12:03 PM Xintong Song <[hidden email]> wrote:

> BTW, the image you previously attached cannot be displayed. So I assume you
> are talking about the "Heap Committed" displayed on Flink's webui?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]>
> wrote:
>
> > Do you still run into the "java.lang.OutOfMemoryError: Java heap space"?
> >
> > If not, then you don't really need to worry about the committed memory.
> >
> > It is the maximum that really matters. The committed memory should
> > increase automatically when it's needed.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]>
> > wrote:
> >
> >> Hi Xintong,
> >>
> >> Thanks for the quick response.
> >>
> >> I have kept my task manager memory to be 1.5GB. But still seeing the
> Heap
> >> committed metric to be around 54MB or so. Why does this happen ? Should
> I
> >> configure any memory fraction configurations here ?
> >>
> >> Thanks.
> >>
> >> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]>
> >> wrote:
> >>
> >> > Hi Ramya,
> >> >
> >> > Increasing the memory of your pod will not give you more JVM heap
> space.
> >> > You will need to configure Flink so it launches the JVM process with
> >> more
> >> > memory.
> >> >
> >> > In Flink 1.7, this could be achieved by configuring
> >> 'jobmanager.heap.size'
> >> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them are
> by
> >> > default 1024m.
> >> >
> >> > Please also note that, you should not configure these two options two
> as
> >> > large as your Kubernetes pod. Because Flink may also have some
> off-heap
> >> > memory overhead, so the total memory consumed by the Flink processes
> >> might
> >> > be larger than configured. This may cause your pods getting killed by
> >> > Kubernetes due to memory exceeding.
> >> >
> >> > According to our experience, leaving around 20~25% of your pod memory
> >> for
> >> > such overhead might be a good practice. In your case, that means
> >> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in your
> >> > workload, you may need to further increase the off-heap memory size.
> >> >
> >> > Thank you~
> >> >
> >> > Xintong Song
> >> >
> >> >
> >> >
> >> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]>
> >> > wrote:
> >> >
> >> > > Thanks Till.
> >> > > Actually, i have around 5GB pods for each TM, and each pod with only
> >> one
> >> > > slot.
> >> > > But the metrics i have pulled is as below, which is slightly
> >> confusing.
> >> > > It says only ~50MB of Heap is committed for the tasks. Would you be
> >> able
> >> > > to point me to the right configuration to be set.
> >> > >
> >> > > Thanks
> >> > > ~Ramya.
> >> > >
> >> > > [image: image.png]
> >> > >
> >> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <[hidden email]>
> >> > wrote:
> >> > >
> >> > >> Hi Ramya,
> >> > >>
> >> > >> it looks as if you should give your Flink pods and also the Flink
> >> > process
> >> > >> a
> >> > >> bit more memory as the process fails with an out of memory error.
> You
> >> > >> could
> >> > >> also try Flink's latest version which comes with native Kubernetes
> >> > >> support.
> >> > >>
> >> > >> Cheers,
> >> > >> Till
> >> > >>
> >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <[hidden email]
> >
> >> > >> wrote:
> >> > >>
> >> > >> > Hi,
> >> > >> >
> >> > >> > My flink jobs are constantly going down beyond an hour with the
> >> below
> >> > >> > exception.
> >> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
> >> > >> >
> >> > >> > AsynchronousException{java.lang.Exception: Could not materialize
> >> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid,
> >> _zpsbd3,
> >> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode,
> ssresp,
> >> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> >> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')),
> select:
> >> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >> > >> >         at
> >> > >> >
> >> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> > >> >         at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> > >> >         at java.lang.Thread.run(Thread.java:748)
> >> > >> > Caused by: java.lang.Exception: Could not materialize checkpoint
> 21
> >> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> >> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp, reason,
> >> ts)
> >> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks ->
> >> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3,
> _zpsbd6,
> >> ts)
> >> > >> > -> time attribute: (ts) (5/6).
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >> > >> >         ... 6 more
> >> > >> > Caused by: java.util.concurrent.ExecutionException:
> >> > >> > java.lang.OutOfMemoryError: Java heap space
> >> > >> >         at
> >> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >> > >> >         at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >> > >> >         at
> >> > >> >
> >> >
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >> > >> >         ... 5 more
> >> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
> >> > >> >         at
> >> > >> >
> >> > >>
> >> >
> >>
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
> >> > >> >         ... 4 more
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > Any help here in understanding this would be highly appreciated.
> >> > >> >
> >> > >> >
> >> > >> > Thanks.
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Kubes -- issues

Xintong Song
Usually that means the remote task manager, where the slot locates, is lost.
You will need to look into the log of that task manager to find out what's
wrong with it.


Thank you~

Xintong Song



On Fri, Jun 12, 2020 at 4:13 PM Ramya Ramamurthy <[hidden email]> wrote:

> Yes ... the image was on Heap Committed metrics.
> And i have not yet faced this issue now, post changing the memory.
>
> I seem to get one more frequent error:
> org.apache.flink.util.FlinkException:
> The assigned slot d9d4db5cc747bcbd374888d97e81945b_0 was removed.
>
> When are we likely to get this ??
>
> Thanks,
>
> ~Ramya.
>
>
> On Fri, Jun 12, 2020 at 12:03 PM Xintong Song <[hidden email]>
> wrote:
>
> > BTW, the image you previously attached cannot be displayed. So I assume
> you
> > are talking about the "Heap Committed" displayed on Flink's webui?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Jun 12, 2020 at 2:30 PM Xintong Song <[hidden email]>
> > wrote:
> >
> > > Do you still run into the "java.lang.OutOfMemoryError: Java heap
> space"?
> > >
> > > If not, then you don't really need to worry about the committed memory.
> > >
> > > It is the maximum that really matters. The committed memory should
> > > increase automatically when it's needed.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Fri, Jun 12, 2020 at 2:24 PM Ramya Ramamurthy <[hidden email]>
> > > wrote:
> > >
> > >> Hi Xintong,
> > >>
> > >> Thanks for the quick response.
> > >>
> > >> I have kept my task manager memory to be 1.5GB. But still seeing the
> > Heap
> > >> committed metric to be around 54MB or so. Why does this happen ?
> Should
> > I
> > >> configure any memory fraction configurations here ?
> > >>
> > >> Thanks.
> > >>
> > >> On Fri, Jun 12, 2020 at 10:58 AM Xintong Song <[hidden email]>
> > >> wrote:
> > >>
> > >> > Hi Ramya,
> > >> >
> > >> > Increasing the memory of your pod will not give you more JVM heap
> > space.
> > >> > You will need to configure Flink so it launches the JVM process with
> > >> more
> > >> > memory.
> > >> >
> > >> > In Flink 1.7, this could be achieved by configuring
> > >> 'jobmanager.heap.size'
> > >> > & 'taskmanager.heap.size' in your 'flink-conf.yaml'. Both of them
> are
> > by
> > >> > default 1024m.
> > >> >
> > >> > Please also note that, you should not configure these two options
> two
> > as
> > >> > large as your Kubernetes pod. Because Flink may also have some
> > off-heap
> > >> > memory overhead, so the total memory consumed by the Flink processes
> > >> might
> > >> > be larger than configured. This may cause your pods getting killed
> by
> > >> > Kubernetes due to memory exceeding.
> > >> >
> > >> > According to our experience, leaving around 20~25% of your pod
> memory
> > >> for
> > >> > such overhead might be a good practice. In your case, that means
> > >> > configuring 'taskmanager.heap.size' to 4GB. If RocksDB is used in
> your
> > >> > workload, you may need to further increase the off-heap memory size.
> > >> >
> > >> > Thank you~
> > >> >
> > >> > Xintong Song
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jun 12, 2020 at 1:11 PM Ramya Ramamurthy <[hidden email]
> >
> > >> > wrote:
> > >> >
> > >> > > Thanks Till.
> > >> > > Actually, i have around 5GB pods for each TM, and each pod with
> only
> > >> one
> > >> > > slot.
> > >> > > But the metrics i have pulled is as below, which is slightly
> > >> confusing.
> > >> > > It says only ~50MB of Heap is committed for the tasks. Would you
> be
> > >> able
> > >> > > to point me to the right configuration to be set.
> > >> > >
> > >> > > Thanks
> > >> > > ~Ramya.
> > >> > >
> > >> > > [image: image.png]
> > >> > >
> > >> > > On Tue, Jun 9, 2020 at 3:12 PM Till Rohrmann <
> [hidden email]>
> > >> > wrote:
> > >> > >
> > >> > >> Hi Ramya,
> > >> > >>
> > >> > >> it looks as if you should give your Flink pods and also the Flink
> > >> > process
> > >> > >> a
> > >> > >> bit more memory as the process fails with an out of memory error.
> > You
> > >> > >> could
> > >> > >> also try Flink's latest version which comes with native
> Kubernetes
> > >> > >> support.
> > >> > >>
> > >> > >> Cheers,
> > >> > >> Till
> > >> > >>
> > >> > >> On Tue, Jun 9, 2020 at 8:45 AM Ramya Ramamurthy <
> [hidden email]
> > >
> > >> > >> wrote:
> > >> > >>
> > >> > >> > Hi,
> > >> > >> >
> > >> > >> > My flink jobs are constantly going down beyond an hour with the
> > >> below
> > >> > >> > exception.
> > >> > >> > This is Flink 1.7 on kubes, with checkpoints to Google storage.
> > >> > >> >
> > >> > >> > AsynchronousException{java.lang.Exception: Could not
> materialize
> > >> > >> > checkpoint 21 for operator Source: Kafka011TableSource(sid,
> > >> _zpsbd3,
> > >> > >> > _zpsbd4, _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode,
> > ssresp,
> > >> > >> > reason, ts) -> from: (sid, _zpsbd3, _zpsbd6, ts) ->
> > >> > >> > Timestamps/Watermarks -> where: (<>(sid, _UTF-16LE'7759')),
> > select:
> > >> > >> > (sid, _zpsbd3, _zpsbd6, ts) -> time attribute: (ts) (5/6).}
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > >> > >> >         at
> > >> > >> >
> > >> >
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >> > >> >         at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > >> > >> >         at java.lang.Thread.run(Thread.java:748)
> > >> > >> > Caused by: java.lang.Exception: Could not materialize
> checkpoint
> > 21
> > >> > >> > for operator Source: Kafka011TableSource(sid, _zpsbd3, _zpsbd4,
> > >> > >> > _zpsbd6, _zpsbd7, _zpsbd9, lvl_1, isBot, botcode, ssresp,
> reason,
> > >> ts)
> > >> > >> > -> from: (sid, _zpsbd3, _zpsbd6, ts) -> Timestamps/Watermarks
> ->
> > >> > >> > where: (<>(sid, _UTF-16LE'7759')), select: (sid, _zpsbd3,
> > _zpsbd6,
> > >> ts)
> > >> > >> > -> time attribute: (ts) (5/6).
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > >> > >> >         ... 6 more
> > >> > >> > Caused by: java.util.concurrent.ExecutionException:
> > >> > >> > java.lang.OutOfMemoryError: Java heap space
> > >> > >> >         at
> > >> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > >> > >> >         at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > >> > >> >         at
> > >> > >> >
> > >> >
> > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > >> > >> >         ... 5 more
> > >> > >> > Caused by: java.lang.OutOfMemoryError: Java heap space
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:609)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:408)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:558)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:482)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:599)
> > >> > >> >         at
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:272)
> > >> > >> >         ... 4 more
> > >> > >> >
> > >> > >> >
> > >> > >> >
> > >> > >> > Any help here in understanding this would be highly
> appreciated.
> > >> > >> >
> > >> > >> >
> > >> > >> > Thanks.
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>