Serious memory leak in DefaultOperatorStateBackend

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Serious memory leak in DefaultOperatorStateBackend

Gyula Fóra
Hi all,

We have discovered a fairly serious memory leak
in DefaultOperatorStateBackend, with broadcast (union) list states.

The problem seems to occur when a broadcast state name is changed, in order
to drop some state (intentionally).

Flink does not drop the "garbage" broadcast state, and keeps snapshotting,
broadcasting, multiplying it exponentially at every savepoint/restore cycle.

With high enough parallelism this can easily lead to small states (few
bytes) growing to several gigs and more over a few restarts eventually
leading to a very bad crash restart cycle where TMs OOM in a few secs.

Basically 2 things seems to be missing, garbage collection of unreferenced
operator states (they are eagerly restored into memory). And probably lazy
restore would also be nice :)

We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't checked
the latest master.

Could someone please confirm that this behaviour is not as intended?

Cheers,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Serious memory leak in DefaultOperatorStateBackend

Stefan Richter

Hi,

I don’t think that this is a bug, but rather a necessity that comes with the (imo questionable) design of allowing lazy state registration. In this design, just because a state is *currently* not registered does not mean that you can simply drop it. Imagine that your code did *not yet* re-register a state, but could still do so in the future. If a checkpoint/recovery happens in between, all data for that state would suddenly be lost, just because by chance the state was not registered „fast enough“. As I see it, the proper way is the register the state under the same name and clear it if you want to get rid of the data. There is currently no call that explicitly drops a state that was once declared, and you might make a case that this is a feature to have for the future. Then again, we need a general decision about lazy and eager state IMO.

Best,
Stefan
 

> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <[hidden email]>:
>
> Hi all,
>
> We have discovered a fairly serious memory leak
> in DefaultOperatorStateBackend, with broadcast (union) list states.
>
> The problem seems to occur when a broadcast state name is changed, in order
> to drop some state (intentionally).
>
> Flink does not drop the "garbage" broadcast state, and keeps snapshotting,
> broadcasting, multiplying it exponentially at every savepoint/restore cycle.
>
> With high enough parallelism this can easily lead to small states (few
> bytes) growing to several gigs and more over a few restarts eventually
> leading to a very bad crash restart cycle where TMs OOM in a few secs.
>
> Basically 2 things seems to be missing, garbage collection of unreferenced
> operator states (they are eagerly restored into memory). And probably lazy
> restore would also be nice :)
>
> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't checked
> the latest master.
>
> Could someone please confirm that this behaviour is not as intended?
>
> Cheers,
> Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Serious memory leak in DefaultOperatorStateBackend

Gyula Fóra
Do you have any suggestion how to completely delete an operator and keyed
state?
For operator state this seems to be easy enough, but what about completely
dropping a keyed state?

Gyula

Stefan Richter <[hidden email]> ezt írta (időpont: 2018. febr.
22., Cs, 11:46):

>
> Hi,
>
> I don’t think that this is a bug, but rather a necessity that comes with
> the (imo questionable) design of allowing lazy state registration. In this
> design, just because a state is *currently* not registered does not mean
> that you can simply drop it. Imagine that your code did *not yet*
> re-register a state, but could still do so in the future. If a
> checkpoint/recovery happens in between, all data for that state would
> suddenly be lost, just because by chance the state was not registered „fast
> enough“. As I see it, the proper way is the register the state under the
> same name and clear it if you want to get rid of the data. There is
> currently no call that explicitly drops a state that was once declared, and
> you might make a case that this is a feature to have for the future. Then
> again, we need a general decision about lazy and eager state IMO.
>
> Best,
> Stefan
>
> > Am 22.02.2018 um 11:10 schrieb Gyula Fóra <[hidden email]>:
> >
> > Hi all,
> >
> > We have discovered a fairly serious memory leak
> > in DefaultOperatorStateBackend, with broadcast (union) list states.
> >
> > The problem seems to occur when a broadcast state name is changed, in
> order
> > to drop some state (intentionally).
> >
> > Flink does not drop the "garbage" broadcast state, and keeps
> snapshotting,
> > broadcasting, multiplying it exponentially at every savepoint/restore
> cycle.
> >
> > With high enough parallelism this can easily lead to small states (few
> > bytes) growing to several gigs and more over a few restarts eventually
> > leading to a very bad crash restart cycle where TMs OOM in a few secs.
> >
> > Basically 2 things seems to be missing, garbage collection of
> unreferenced
> > operator states (they are eagerly restored into memory). And probably
> lazy
> > restore would also be nice :)
> >
> > We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
> checked
> > the latest master.
> >
> > Could someone please confirm that this behaviour is not as intended?
> >
> > Cheers,
> > Gyula
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Serious memory leak in DefaultOperatorStateBackend

Stefan Richter
Right now, I don’t think there is a way of doing that. I don’t think there is something fundament against having a method that drops a state complete, data and registered meta data. But so far that never existed and it seems nobody ever needed it (or asked for it at least). The closest thing for keyed state is calling the „clear()“ method, but the meta data will stick.

> Am 22.02.2018 um 12:06 schrieb Gyula Fóra <[hidden email]>:
>
> Do you have any suggestion how to completely delete an operator and keyed
> state?
> For operator state this seems to be easy enough, but what about completely
> dropping a keyed state?
>
> Gyula
>
> Stefan Richter <[hidden email]> ezt írta (időpont: 2018. febr.
> 22., Cs, 11:46):
>
>>
>> Hi,
>>
>> I don’t think that this is a bug, but rather a necessity that comes with
>> the (imo questionable) design of allowing lazy state registration. In this
>> design, just because a state is *currently* not registered does not mean
>> that you can simply drop it. Imagine that your code did *not yet*
>> re-register a state, but could still do so in the future. If a
>> checkpoint/recovery happens in between, all data for that state would
>> suddenly be lost, just because by chance the state was not registered „fast
>> enough“. As I see it, the proper way is the register the state under the
>> same name and clear it if you want to get rid of the data. There is
>> currently no call that explicitly drops a state that was once declared, and
>> you might make a case that this is a feature to have for the future. Then
>> again, we need a general decision about lazy and eager state IMO.
>>
>> Best,
>> Stefan
>>
>>> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <[hidden email]>:
>>>
>>> Hi all,
>>>
>>> We have discovered a fairly serious memory leak
>>> in DefaultOperatorStateBackend, with broadcast (union) list states.
>>>
>>> The problem seems to occur when a broadcast state name is changed, in
>> order
>>> to drop some state (intentionally).
>>>
>>> Flink does not drop the "garbage" broadcast state, and keeps
>> snapshotting,
>>> broadcasting, multiplying it exponentially at every savepoint/restore
>> cycle.
>>>
>>> With high enough parallelism this can easily lead to small states (few
>>> bytes) growing to several gigs and more over a few restarts eventually
>>> leading to a very bad crash restart cycle where TMs OOM in a few secs.
>>>
>>> Basically 2 things seems to be missing, garbage collection of
>> unreferenced
>>> operator states (they are eagerly restored into memory). And probably
>> lazy
>>> restore would also be nice :)
>>>
>>> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
>> checked
>>> the latest master.
>>>
>>> Could someone please confirm that this behaviour is not as intended?
>>>
>>> Cheers,
>>> Gyula
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Serious memory leak in DefaultOperatorStateBackend

Stefan Richter
I had a quick look at it, and we could do that, even for RocksDB: the method does a meta data lookup similar to what state registration does, remove the meta data and drop the column family. But until then, there is currently no complete dropping a keyed state.

> Am 22.02.2018 um 12:19 schrieb Stefan Richter <[hidden email]>:
>
> Right now, I don’t think there is a way of doing that. I don’t think there is something fundament against having a method that drops a state complete, data and registered meta data. But so far that never existed and it seems nobody ever needed it (or asked for it at least). The closest thing for keyed state is calling the „clear()“ method, but the meta data will stick.
>
>> Am 22.02.2018 um 12:06 schrieb Gyula Fóra <[hidden email]>:
>>
>> Do you have any suggestion how to completely delete an operator and keyed
>> state?
>> For operator state this seems to be easy enough, but what about completely
>> dropping a keyed state?
>>
>> Gyula
>>
>> Stefan Richter <[hidden email]> ezt írta (időpont: 2018. febr.
>> 22., Cs, 11:46):
>>
>>>
>>> Hi,
>>>
>>> I don’t think that this is a bug, but rather a necessity that comes with
>>> the (imo questionable) design of allowing lazy state registration. In this
>>> design, just because a state is *currently* not registered does not mean
>>> that you can simply drop it. Imagine that your code did *not yet*
>>> re-register a state, but could still do so in the future. If a
>>> checkpoint/recovery happens in between, all data for that state would
>>> suddenly be lost, just because by chance the state was not registered „fast
>>> enough“. As I see it, the proper way is the register the state under the
>>> same name and clear it if you want to get rid of the data. There is
>>> currently no call that explicitly drops a state that was once declared, and
>>> you might make a case that this is a feature to have for the future. Then
>>> again, we need a general decision about lazy and eager state IMO.
>>>
>>> Best,
>>> Stefan
>>>
>>>> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <[hidden email]>:
>>>>
>>>> Hi all,
>>>>
>>>> We have discovered a fairly serious memory leak
>>>> in DefaultOperatorStateBackend, with broadcast (union) list states.
>>>>
>>>> The problem seems to occur when a broadcast state name is changed, in
>>> order
>>>> to drop some state (intentionally).
>>>>
>>>> Flink does not drop the "garbage" broadcast state, and keeps
>>> snapshotting,
>>>> broadcasting, multiplying it exponentially at every savepoint/restore
>>> cycle.
>>>>
>>>> With high enough parallelism this can easily lead to small states (few
>>>> bytes) growing to several gigs and more over a few restarts eventually
>>>> leading to a very bad crash restart cycle where TMs OOM in a few secs.
>>>>
>>>> Basically 2 things seems to be missing, garbage collection of
>>> unreferenced
>>>> operator states (they are eagerly restored into memory). And probably
>>> lazy
>>>> restore would also be nice :)
>>>>
>>>> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
>>> checked
>>>> the latest master.
>>>>
>>>> Could someone please confirm that this behaviour is not as intended?
>>>>
>>>> Cheers,
>>>> Gyula
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Serious memory leak in DefaultOperatorStateBackend

Gyula Fóra
Hi,
I think one improvement that could be made to avoid this situation is the
broadcast state should probably not be rebroadcasted if it was not read.

The problem here is that our program logic ensures that this growing of
state does not occur when it accesses the state, but since we dont to that
after changing the name it just explodes by the rebroadcasts.

What do you think?

Gyula

Stefan Richter <[hidden email]> ezt írta (időpont: 2018. febr.
22., Cs, 12:26):

> I had a quick look at it, and we could do that, even for RocksDB: the
> method does a meta data lookup similar to what state registration does,
> remove the meta data and drop the column family. But until then, there is
> currently no complete dropping a keyed state.
>
> > Am 22.02.2018 um 12:19 schrieb Stefan Richter <
> [hidden email]>:
> >
> > Right now, I don’t think there is a way of doing that. I don’t think
> there is something fundament against having a method that drops a state
> complete, data and registered meta data. But so far that never existed and
> it seems nobody ever needed it (or asked for it at least). The closest
> thing for keyed state is calling the „clear()“ method, but the meta data
> will stick.
> >
> >> Am 22.02.2018 um 12:06 schrieb Gyula Fóra <[hidden email]>:
> >>
> >> Do you have any suggestion how to completely delete an operator and
> keyed
> >> state?
> >> For operator state this seems to be easy enough, but what about
> completely
> >> dropping a keyed state?
> >>
> >> Gyula
> >>
> >> Stefan Richter <[hidden email]> ezt írta (időpont: 2018.
> febr.
> >> 22., Cs, 11:46):
> >>
> >>>
> >>> Hi,
> >>>
> >>> I don’t think that this is a bug, but rather a necessity that comes
> with
> >>> the (imo questionable) design of allowing lazy state registration. In
> this
> >>> design, just because a state is *currently* not registered does not
> mean
> >>> that you can simply drop it. Imagine that your code did *not yet*
> >>> re-register a state, but could still do so in the future. If a
> >>> checkpoint/recovery happens in between, all data for that state would
> >>> suddenly be lost, just because by chance the state was not registered
> „fast
> >>> enough“. As I see it, the proper way is the register the state under
> the
> >>> same name and clear it if you want to get rid of the data. There is
> >>> currently no call that explicitly drops a state that was once
> declared, and
> >>> you might make a case that this is a feature to have for the future.
> Then
> >>> again, we need a general decision about lazy and eager state IMO.
> >>>
> >>> Best,
> >>> Stefan
> >>>
> >>>> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <[hidden email]>:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> We have discovered a fairly serious memory leak
> >>>> in DefaultOperatorStateBackend, with broadcast (union) list states.
> >>>>
> >>>> The problem seems to occur when a broadcast state name is changed, in
> >>> order
> >>>> to drop some state (intentionally).
> >>>>
> >>>> Flink does not drop the "garbage" broadcast state, and keeps
> >>> snapshotting,
> >>>> broadcasting, multiplying it exponentially at every savepoint/restore
> >>> cycle.
> >>>>
> >>>> With high enough parallelism this can easily lead to small states (few
> >>>> bytes) growing to several gigs and more over a few restarts eventually
> >>>> leading to a very bad crash restart cycle where TMs OOM in a few secs.
> >>>>
> >>>> Basically 2 things seems to be missing, garbage collection of
> >>> unreferenced
> >>>> operator states (they are eagerly restored into memory). And probably
> >>> lazy
> >>>> restore would also be nice :)
> >>>>
> >>>> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
> >>> checked
> >>>> the latest master.
> >>>>
> >>>> Could someone please confirm that this behaviour is not as intended?
> >>>>
> >>>> Cheers,
> >>>> Gyula
> >>>
> >>>
> >
>
>