Memleak in the SessionWindowing example

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Memleak in the SessionWindowing example

Gábor Gévay
Hi,

At Ericsson, we are implementing something similar to what the
SessionWindowing example does:

There are events belonging to phone calls (sessions), and every event
has a call_id, which tells us which call it belongs to. At the end of
every call, a large event has to be emitted that contains some
aggregated information about the call. Furthermore, the events that
mark the end of the calls don't always reach our system, so the
sessions have to timeout, just like in the example.

Therefore, I have experimented a bit with the SessionWindowing
example, and there is a problem: The trigger policy objects belonging
to already terminated sessions are kept in memory, and also
NotifyOnLastGlobalElement gets called on them. So, the application is
eating up more and more memory, and is also getting slower.

I understand that Flink can't just simply discard all state belonging
to empty groups, as it has no way of knowing whether the user supplied
policy wants to trigger in the future (perhaps based on some state
collected before it first triggered).

Therefore, I propose the following addition to the API:
WindowedDataStream would get a method called something like
dropEmptyGroups, by which the user could tell Flink to automatically
discard all state belonging to a group, when the window becomes empty.

The implementation could look like the following: dropEmptyGroups()
would set a flag, and at the end of StreamDiscretizer.evict, if the
flag is true and bufferSize has just become 0, then this
StreamDiscretizer would be removed from the groupedDiscretizers map of
GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
set at creation to have a reference to the GroupedStreamDiscretizer
that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
just run again if an element would later appear in a dropped group
(but this won't happen in this example).) What do you think?

Best regards,
Gabor
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Márton Balassi
Thanks for debugging this Gabor, indeed a good catch.

I am not so sure about surfacing it in the API though - it seems very
specific for the session windowing case. I am also wondering whether maybe
this should actually be the default behavior - if there are already empty
windows for a group why not drop the previous states?

On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]> wrote:

> Hi,
>
> At Ericsson, we are implementing something similar to what the
> SessionWindowing example does:
>
> There are events belonging to phone calls (sessions), and every event
> has a call_id, which tells us which call it belongs to. At the end of
> every call, a large event has to be emitted that contains some
> aggregated information about the call. Furthermore, the events that
> mark the end of the calls don't always reach our system, so the
> sessions have to timeout, just like in the example.
>
> Therefore, I have experimented a bit with the SessionWindowing
> example, and there is a problem: The trigger policy objects belonging
> to already terminated sessions are kept in memory, and also
> NotifyOnLastGlobalElement gets called on them. So, the application is
> eating up more and more memory, and is also getting slower.
>
> I understand that Flink can't just simply discard all state belonging
> to empty groups, as it has no way of knowing whether the user supplied
> policy wants to trigger in the future (perhaps based on some state
> collected before it first triggered).
>
> Therefore, I propose the following addition to the API:
> WindowedDataStream would get a method called something like
> dropEmptyGroups, by which the user could tell Flink to automatically
> discard all state belonging to a group, when the window becomes empty.
>
> The implementation could look like the following: dropEmptyGroups()
> would set a flag, and at the end of StreamDiscretizer.evict, if the
> flag is true and bufferSize has just become 0, then this
> StreamDiscretizer would be removed from the groupedDiscretizers map of
> GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> set at creation to have a reference to the GroupedStreamDiscretizer
> that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> just run again if an element would later appear in a dropped group
> (but this won't happen in this example).) What do you think?
>
> Best regards,
> Gabor
>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Gyula Fóra
Hi,

Indeed a good catch, and a valid issue exactly because of the stateful
nature of the trigger and eviction policies.

I agree with the suggested approach that this should be configurable for
the discretizers (and could be set through the API).

As for the default behaviour, I am not 100%. It could be done in a way that
empty buffers (triggers and evictions associated with them) don't get the
NotifyOnLastGlobalElement call. That would reduce the overhead.

Cheers,
Gyula

On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <[hidden email]>
wrote:

> Thanks for debugging this Gabor, indeed a good catch.
>
> I am not so sure about surfacing it in the API though - it seems very
> specific for the session windowing case. I am also wondering whether maybe
> this should actually be the default behavior - if there are already empty
> windows for a group why not drop the previous states?
>
> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]> wrote:
>
> > Hi,
> >
> > At Ericsson, we are implementing something similar to what the
> > SessionWindowing example does:
> >
> > There are events belonging to phone calls (sessions), and every event
> > has a call_id, which tells us which call it belongs to. At the end of
> > every call, a large event has to be emitted that contains some
> > aggregated information about the call. Furthermore, the events that
> > mark the end of the calls don't always reach our system, so the
> > sessions have to timeout, just like in the example.
> >
> > Therefore, I have experimented a bit with the SessionWindowing
> > example, and there is a problem: The trigger policy objects belonging
> > to already terminated sessions are kept in memory, and also
> > NotifyOnLastGlobalElement gets called on them. So, the application is
> > eating up more and more memory, and is also getting slower.
> >
> > I understand that Flink can't just simply discard all state belonging
> > to empty groups, as it has no way of knowing whether the user supplied
> > policy wants to trigger in the future (perhaps based on some state
> > collected before it first triggered).
> >
> > Therefore, I propose the following addition to the API:
> > WindowedDataStream would get a method called something like
> > dropEmptyGroups, by which the user could tell Flink to automatically
> > discard all state belonging to a group, when the window becomes empty.
> >
> > The implementation could look like the following: dropEmptyGroups()
> > would set a flag, and at the end of StreamDiscretizer.evict, if the
> > flag is true and bufferSize has just become 0, then this
> > StreamDiscretizer would be removed from the groupedDiscretizers map of
> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> > set at creation to have a reference to the GroupedStreamDiscretizer
> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> > just run again if an element would later appear in a dropped group
> > (but this won't happen in this example).) What do you think?
> >
> > Best regards,
> > Gabor
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Gábor Gévay
Hi,

I would vote for making the default behaviour to drop all state for
empty groups, and allow a configuration to set the current behaviour
instead. This issue will probably have a paragraph in the
documentation, but if someone overlooks this, then there is potential
for a greater disaster with the current behaviour, then with dropping:
- If someone is expecting to have the states preserved, then he will
probably immediately notice that something is wrong (because his logic
that required the states will totally not work).
- However, if someone is expecting that the states for empty groups
just disappear (or doesn't even think about what happens with empty
groups), then he might only notice the memleak and slowdown later
(probably in production), which will be very annoying to debug at that
point.

Best regards,
Gabor



2015-05-28 19:23 GMT+02:00 Gyula Fóra <[hidden email]>:

> Hi,
>
> Indeed a good catch, and a valid issue exactly because of the stateful
> nature of the trigger and eviction policies.
>
> I agree with the suggested approach that this should be configurable for
> the discretizers (and could be set through the API).
>
> As for the default behaviour, I am not 100%. It could be done in a way that
> empty buffers (triggers and evictions associated with them) don't get the
> NotifyOnLastGlobalElement call. That would reduce the overhead.
>
> Cheers,
> Gyula
>
> On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <[hidden email]>
> wrote:
>
>> Thanks for debugging this Gabor, indeed a good catch.
>>
>> I am not so sure about surfacing it in the API though - it seems very
>> specific for the session windowing case. I am also wondering whether maybe
>> this should actually be the default behavior - if there are already empty
>> windows for a group why not drop the previous states?
>>
>> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]> wrote:
>>
>> > Hi,
>> >
>> > At Ericsson, we are implementing something similar to what the
>> > SessionWindowing example does:
>> >
>> > There are events belonging to phone calls (sessions), and every event
>> > has a call_id, which tells us which call it belongs to. At the end of
>> > every call, a large event has to be emitted that contains some
>> > aggregated information about the call. Furthermore, the events that
>> > mark the end of the calls don't always reach our system, so the
>> > sessions have to timeout, just like in the example.
>> >
>> > Therefore, I have experimented a bit with the SessionWindowing
>> > example, and there is a problem: The trigger policy objects belonging
>> > to already terminated sessions are kept in memory, and also
>> > NotifyOnLastGlobalElement gets called on them. So, the application is
>> > eating up more and more memory, and is also getting slower.
>> >
>> > I understand that Flink can't just simply discard all state belonging
>> > to empty groups, as it has no way of knowing whether the user supplied
>> > policy wants to trigger in the future (perhaps based on some state
>> > collected before it first triggered).
>> >
>> > Therefore, I propose the following addition to the API:
>> > WindowedDataStream would get a method called something like
>> > dropEmptyGroups, by which the user could tell Flink to automatically
>> > discard all state belonging to a group, when the window becomes empty.
>> >
>> > The implementation could look like the following: dropEmptyGroups()
>> > would set a flag, and at the end of StreamDiscretizer.evict, if the
>> > flag is true and bufferSize has just become 0, then this
>> > StreamDiscretizer would be removed from the groupedDiscretizers map of
>> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
>> > set at creation to have a reference to the GroupedStreamDiscretizer
>> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
>> > just run again if an element would later appear in a dropped group
>> > (but this won't happen in this example).) What do you think?
>> >
>> > Best regards,
>> > Gabor
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Gyula Fóra
Let's not get all dramatic :D

If we don't call any methods on the empty groups we can still keep them
off-memory in a persistent storage with a lazy checkpoint/state-access
logic with practically 0 memory overhead.

Automatically dropping everything will break a lot of programs without
people noticing.

On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <[hidden email]> wrote:

> Hi,
>
> I would vote for making the default behaviour to drop all state for
> empty groups, and allow a configuration to set the current behaviour
> instead. This issue will probably have a paragraph in the
> documentation, but if someone overlooks this, then there is potential
> for a greater disaster with the current behaviour, then with dropping:
> - If someone is expecting to have the states preserved, then he will
> probably immediately notice that something is wrong (because his logic
> that required the states will totally not work).
> - However, if someone is expecting that the states for empty groups
> just disappear (or doesn't even think about what happens with empty
> groups), then he might only notice the memleak and slowdown later
> (probably in production), which will be very annoying to debug at that
> point.
>
> Best regards,
> Gabor
>
>
>
> 2015-05-28 19:23 GMT+02:00 Gyula Fóra <[hidden email]>:
> > Hi,
> >
> > Indeed a good catch, and a valid issue exactly because of the stateful
> > nature of the trigger and eviction policies.
> >
> > I agree with the suggested approach that this should be configurable for
> > the discretizers (and could be set through the API).
> >
> > As for the default behaviour, I am not 100%. It could be done in a way
> that
> > empty buffers (triggers and evictions associated with them) don't get the
> > NotifyOnLastGlobalElement call. That would reduce the overhead.
> >
> > Cheers,
> > Gyula
> >
> > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <
> [hidden email]>
> > wrote:
> >
> >> Thanks for debugging this Gabor, indeed a good catch.
> >>
> >> I am not so sure about surfacing it in the API though - it seems very
> >> specific for the session windowing case. I am also wondering whether
> maybe
> >> this should actually be the default behavior - if there are already
> empty
> >> windows for a group why not drop the previous states?
> >>
> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]> wrote:
> >>
> >> > Hi,
> >> >
> >> > At Ericsson, we are implementing something similar to what the
> >> > SessionWindowing example does:
> >> >
> >> > There are events belonging to phone calls (sessions), and every event
> >> > has a call_id, which tells us which call it belongs to. At the end of
> >> > every call, a large event has to be emitted that contains some
> >> > aggregated information about the call. Furthermore, the events that
> >> > mark the end of the calls don't always reach our system, so the
> >> > sessions have to timeout, just like in the example.
> >> >
> >> > Therefore, I have experimented a bit with the SessionWindowing
> >> > example, and there is a problem: The trigger policy objects belonging
> >> > to already terminated sessions are kept in memory, and also
> >> > NotifyOnLastGlobalElement gets called on them. So, the application is
> >> > eating up more and more memory, and is also getting slower.
> >> >
> >> > I understand that Flink can't just simply discard all state belonging
> >> > to empty groups, as it has no way of knowing whether the user supplied
> >> > policy wants to trigger in the future (perhaps based on some state
> >> > collected before it first triggered).
> >> >
> >> > Therefore, I propose the following addition to the API:
> >> > WindowedDataStream would get a method called something like
> >> > dropEmptyGroups, by which the user could tell Flink to automatically
> >> > discard all state belonging to a group, when the window becomes empty.
> >> >
> >> > The implementation could look like the following: dropEmptyGroups()
> >> > would set a flag, and at the end of StreamDiscretizer.evict, if the
> >> > flag is true and bufferSize has just become 0, then this
> >> > StreamDiscretizer would be removed from the groupedDiscretizers map of
> >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> >> > set at creation to have a reference to the GroupedStreamDiscretizer
> >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> >> > just run again if an element would later appear in a dropped group
> >> > (but this won't happen in this example).) What do you think?
> >> >
> >> > Best regards,
> >> > Gabor
> >> >
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Gábor Gévay
> Let's not get all dramatic :D

Ok, sorry :D

> If we don't call any methods on the empty groups we can still keep them
> off-memory in a persistent storage with a lazy checkpoint/state-access
> logic with practically 0 memory overhead.

So you mean that whether to call notifyOnLastGlobalElement when the
window is empty would be a second configuration option? Or this would
not be configurable?

Best regards,
Gabor



2015-05-28 19:52 GMT+02:00 Gyula Fóra <[hidden email]>:

> Let's not get all dramatic :D
>
> If we don't call any methods on the empty groups we can still keep them
> off-memory in a persistent storage with a lazy checkpoint/state-access
> logic with practically 0 memory overhead.
>
> Automatically dropping everything will break a lot of programs without
> people noticing.
>
> On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <[hidden email]> wrote:
>
>> Hi,
>>
>> I would vote for making the default behaviour to drop all state for
>> empty groups, and allow a configuration to set the current behaviour
>> instead. This issue will probably have a paragraph in the
>> documentation, but if someone overlooks this, then there is potential
>> for a greater disaster with the current behaviour, then with dropping:
>> - If someone is expecting to have the states preserved, then he will
>> probably immediately notice that something is wrong (because his logic
>> that required the states will totally not work).
>> - However, if someone is expecting that the states for empty groups
>> just disappear (or doesn't even think about what happens with empty
>> groups), then he might only notice the memleak and slowdown later
>> (probably in production), which will be very annoying to debug at that
>> point.
>>
>> Best regards,
>> Gabor
>>
>>
>>
>> 2015-05-28 19:23 GMT+02:00 Gyula Fóra <[hidden email]>:
>> > Hi,
>> >
>> > Indeed a good catch, and a valid issue exactly because of the stateful
>> > nature of the trigger and eviction policies.
>> >
>> > I agree with the suggested approach that this should be configurable for
>> > the discretizers (and could be set through the API).
>> >
>> > As for the default behaviour, I am not 100%. It could be done in a way
>> that
>> > empty buffers (triggers and evictions associated with them) don't get the
>> > NotifyOnLastGlobalElement call. That would reduce the overhead.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <
>> [hidden email]>
>> > wrote:
>> >
>> >> Thanks for debugging this Gabor, indeed a good catch.
>> >>
>> >> I am not so sure about surfacing it in the API though - it seems very
>> >> specific for the session windowing case. I am also wondering whether
>> maybe
>> >> this should actually be the default behavior - if there are already
>> empty
>> >> windows for a group why not drop the previous states?
>> >>
>> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]> wrote:
>> >>
>> >> > Hi,
>> >> >
>> >> > At Ericsson, we are implementing something similar to what the
>> >> > SessionWindowing example does:
>> >> >
>> >> > There are events belonging to phone calls (sessions), and every event
>> >> > has a call_id, which tells us which call it belongs to. At the end of
>> >> > every call, a large event has to be emitted that contains some
>> >> > aggregated information about the call. Furthermore, the events that
>> >> > mark the end of the calls don't always reach our system, so the
>> >> > sessions have to timeout, just like in the example.
>> >> >
>> >> > Therefore, I have experimented a bit with the SessionWindowing
>> >> > example, and there is a problem: The trigger policy objects belonging
>> >> > to already terminated sessions are kept in memory, and also
>> >> > NotifyOnLastGlobalElement gets called on them. So, the application is
>> >> > eating up more and more memory, and is also getting slower.
>> >> >
>> >> > I understand that Flink can't just simply discard all state belonging
>> >> > to empty groups, as it has no way of knowing whether the user supplied
>> >> > policy wants to trigger in the future (perhaps based on some state
>> >> > collected before it first triggered).
>> >> >
>> >> > Therefore, I propose the following addition to the API:
>> >> > WindowedDataStream would get a method called something like
>> >> > dropEmptyGroups, by which the user could tell Flink to automatically
>> >> > discard all state belonging to a group, when the window becomes empty.
>> >> >
>> >> > The implementation could look like the following: dropEmptyGroups()
>> >> > would set a flag, and at the end of StreamDiscretizer.evict, if the
>> >> > flag is true and bufferSize has just become 0, then this
>> >> > StreamDiscretizer would be removed from the groupedDiscretizers map of
>> >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
>> >> > set at creation to have a reference to the GroupedStreamDiscretizer
>> >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
>> >> > just run again if an element would later appear in a dropped group
>> >> > (but this won't happen in this example).) What do you think?
>> >> >
>> >> > Best regards,
>> >> > Gabor
>> >> >
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Robert Metzger
What is the status of this issue?
I think we should at least file a JIRA for it to have it around as a TODO.

On Thu, May 28, 2015 at 10:01 PM, Gábor Gévay <[hidden email]> wrote:

> > Let's not get all dramatic :D
>
> Ok, sorry :D
>
> > If we don't call any methods on the empty groups we can still keep them
> > off-memory in a persistent storage with a lazy checkpoint/state-access
> > logic with practically 0 memory overhead.
>
> So you mean that whether to call notifyOnLastGlobalElement when the
> window is empty would be a second configuration option? Or this would
> not be configurable?
>
> Best regards,
> Gabor
>
>
>
> 2015-05-28 19:52 GMT+02:00 Gyula Fóra <[hidden email]>:
> > Let's not get all dramatic :D
> >
> > If we don't call any methods on the empty groups we can still keep them
> > off-memory in a persistent storage with a lazy checkpoint/state-access
> > logic with practically 0 memory overhead.
> >
> > Automatically dropping everything will break a lot of programs without
> > people noticing.
> >
> > On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <[hidden email]> wrote:
> >
> >> Hi,
> >>
> >> I would vote for making the default behaviour to drop all state for
> >> empty groups, and allow a configuration to set the current behaviour
> >> instead. This issue will probably have a paragraph in the
> >> documentation, but if someone overlooks this, then there is potential
> >> for a greater disaster with the current behaviour, then with dropping:
> >> - If someone is expecting to have the states preserved, then he will
> >> probably immediately notice that something is wrong (because his logic
> >> that required the states will totally not work).
> >> - However, if someone is expecting that the states for empty groups
> >> just disappear (or doesn't even think about what happens with empty
> >> groups), then he might only notice the memleak and slowdown later
> >> (probably in production), which will be very annoying to debug at that
> >> point.
> >>
> >> Best regards,
> >> Gabor
> >>
> >>
> >>
> >> 2015-05-28 19:23 GMT+02:00 Gyula Fóra <[hidden email]>:
> >> > Hi,
> >> >
> >> > Indeed a good catch, and a valid issue exactly because of the stateful
> >> > nature of the trigger and eviction policies.
> >> >
> >> > I agree with the suggested approach that this should be configurable
> for
> >> > the discretizers (and could be set through the API).
> >> >
> >> > As for the default behaviour, I am not 100%. It could be done in a way
> >> that
> >> > empty buffers (triggers and evictions associated with them) don't get
> the
> >> > NotifyOnLastGlobalElement call. That would reduce the overhead.
> >> >
> >> > Cheers,
> >> > Gyula
> >> >
> >> > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <
> >> [hidden email]>
> >> > wrote:
> >> >
> >> >> Thanks for debugging this Gabor, indeed a good catch.
> >> >>
> >> >> I am not so sure about surfacing it in the API though - it seems very
> >> >> specific for the session windowing case. I am also wondering whether
> >> maybe
> >> >> this should actually be the default behavior - if there are already
> >> empty
> >> >> windows for a group why not drop the previous states?
> >> >>
> >> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]>
> wrote:
> >> >>
> >> >> > Hi,
> >> >> >
> >> >> > At Ericsson, we are implementing something similar to what the
> >> >> > SessionWindowing example does:
> >> >> >
> >> >> > There are events belonging to phone calls (sessions), and every
> event
> >> >> > has a call_id, which tells us which call it belongs to. At the end
> of
> >> >> > every call, a large event has to be emitted that contains some
> >> >> > aggregated information about the call. Furthermore, the events that
> >> >> > mark the end of the calls don't always reach our system, so the
> >> >> > sessions have to timeout, just like in the example.
> >> >> >
> >> >> > Therefore, I have experimented a bit with the SessionWindowing
> >> >> > example, and there is a problem: The trigger policy objects
> belonging
> >> >> > to already terminated sessions are kept in memory, and also
> >> >> > NotifyOnLastGlobalElement gets called on them. So, the application
> is
> >> >> > eating up more and more memory, and is also getting slower.
> >> >> >
> >> >> > I understand that Flink can't just simply discard all state
> belonging
> >> >> > to empty groups, as it has no way of knowing whether the user
> supplied
> >> >> > policy wants to trigger in the future (perhaps based on some state
> >> >> > collected before it first triggered).
> >> >> >
> >> >> > Therefore, I propose the following addition to the API:
> >> >> > WindowedDataStream would get a method called something like
> >> >> > dropEmptyGroups, by which the user could tell Flink to
> automatically
> >> >> > discard all state belonging to a group, when the window becomes
> empty.
> >> >> >
> >> >> > The implementation could look like the following: dropEmptyGroups()
> >> >> > would set a flag, and at the end of StreamDiscretizer.evict, if the
> >> >> > flag is true and bufferSize has just become 0, then this
> >> >> > StreamDiscretizer would be removed from the groupedDiscretizers
> map of
> >> >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> >> >> > set at creation to have a reference to the GroupedStreamDiscretizer
> >> >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> >> >> > just run again if an element would later appear in a dropped group
> >> >> > (but this won't happen in this example).) What do you think?
> >> >> >
> >> >> > Best regards,
> >> >> > Gabor
> >> >> >
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Memleak in the SessionWindowing example

Gábor Gévay
I have now created the JIRA:
https://issues.apache.org/jira/browse/FLINK-2181

Best regards,
Gabor


2015-06-08 0:55 GMT+02:00 Robert Metzger <[hidden email]>:

> What is the status of this issue?
> I think we should at least file a JIRA for it to have it around as a TODO.
>
> On Thu, May 28, 2015 at 10:01 PM, Gábor Gévay <[hidden email]> wrote:
>
>> > Let's not get all dramatic :D
>>
>> Ok, sorry :D
>>
>> > If we don't call any methods on the empty groups we can still keep them
>> > off-memory in a persistent storage with a lazy checkpoint/state-access
>> > logic with practically 0 memory overhead.
>>
>> So you mean that whether to call notifyOnLastGlobalElement when the
>> window is empty would be a second configuration option? Or this would
>> not be configurable?
>>
>> Best regards,
>> Gabor
>>
>>
>>
>> 2015-05-28 19:52 GMT+02:00 Gyula Fóra <[hidden email]>:
>> > Let's not get all dramatic :D
>> >
>> > If we don't call any methods on the empty groups we can still keep them
>> > off-memory in a persistent storage with a lazy checkpoint/state-access
>> > logic with practically 0 memory overhead.
>> >
>> > Automatically dropping everything will break a lot of programs without
>> > people noticing.
>> >
>> > On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <[hidden email]> wrote:
>> >
>> >> Hi,
>> >>
>> >> I would vote for making the default behaviour to drop all state for
>> >> empty groups, and allow a configuration to set the current behaviour
>> >> instead. This issue will probably have a paragraph in the
>> >> documentation, but if someone overlooks this, then there is potential
>> >> for a greater disaster with the current behaviour, then with dropping:
>> >> - If someone is expecting to have the states preserved, then he will
>> >> probably immediately notice that something is wrong (because his logic
>> >> that required the states will totally not work).
>> >> - However, if someone is expecting that the states for empty groups
>> >> just disappear (or doesn't even think about what happens with empty
>> >> groups), then he might only notice the memleak and slowdown later
>> >> (probably in production), which will be very annoying to debug at that
>> >> point.
>> >>
>> >> Best regards,
>> >> Gabor
>> >>
>> >>
>> >>
>> >> 2015-05-28 19:23 GMT+02:00 Gyula Fóra <[hidden email]>:
>> >> > Hi,
>> >> >
>> >> > Indeed a good catch, and a valid issue exactly because of the stateful
>> >> > nature of the trigger and eviction policies.
>> >> >
>> >> > I agree with the suggested approach that this should be configurable
>> for
>> >> > the discretizers (and could be set through the API).
>> >> >
>> >> > As for the default behaviour, I am not 100%. It could be done in a way
>> >> that
>> >> > empty buffers (triggers and evictions associated with them) don't get
>> the
>> >> > NotifyOnLastGlobalElement call. That would reduce the overhead.
>> >> >
>> >> > Cheers,
>> >> > Gyula
>> >> >
>> >> > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <
>> >> [hidden email]>
>> >> > wrote:
>> >> >
>> >> >> Thanks for debugging this Gabor, indeed a good catch.
>> >> >>
>> >> >> I am not so sure about surfacing it in the API though - it seems very
>> >> >> specific for the session windowing case. I am also wondering whether
>> >> maybe
>> >> >> this should actually be the default behavior - if there are already
>> >> empty
>> >> >> windows for a group why not drop the previous states?
>> >> >>
>> >> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <[hidden email]>
>> wrote:
>> >> >>
>> >> >> > Hi,
>> >> >> >
>> >> >> > At Ericsson, we are implementing something similar to what the
>> >> >> > SessionWindowing example does:
>> >> >> >
>> >> >> > There are events belonging to phone calls (sessions), and every
>> event
>> >> >> > has a call_id, which tells us which call it belongs to. At the end
>> of
>> >> >> > every call, a large event has to be emitted that contains some
>> >> >> > aggregated information about the call. Furthermore, the events that
>> >> >> > mark the end of the calls don't always reach our system, so the
>> >> >> > sessions have to timeout, just like in the example.
>> >> >> >
>> >> >> > Therefore, I have experimented a bit with the SessionWindowing
>> >> >> > example, and there is a problem: The trigger policy objects
>> belonging
>> >> >> > to already terminated sessions are kept in memory, and also
>> >> >> > NotifyOnLastGlobalElement gets called on them. So, the application
>> is
>> >> >> > eating up more and more memory, and is also getting slower.
>> >> >> >
>> >> >> > I understand that Flink can't just simply discard all state
>> belonging
>> >> >> > to empty groups, as it has no way of knowing whether the user
>> supplied
>> >> >> > policy wants to trigger in the future (perhaps based on some state
>> >> >> > collected before it first triggered).
>> >> >> >
>> >> >> > Therefore, I propose the following addition to the API:
>> >> >> > WindowedDataStream would get a method called something like
>> >> >> > dropEmptyGroups, by which the user could tell Flink to
>> automatically
>> >> >> > discard all state belonging to a group, when the window becomes
>> empty.
>> >> >> >
>> >> >> > The implementation could look like the following: dropEmptyGroups()
>> >> >> > would set a flag, and at the end of StreamDiscretizer.evict, if the
>> >> >> > flag is true and bufferSize has just become 0, then this
>> >> >> > StreamDiscretizer would be removed from the groupedDiscretizers
>> map of
>> >> >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
>> >> >> > set at creation to have a reference to the GroupedStreamDiscretizer
>> >> >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
>> >> >> > just run again if an element would later appear in a dropped group
>> >> >> > (but this won't happen in this example).) What do you think?
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Gabor
>> >> >> >
>> >> >>
>> >>
>>