[Discuss] Semantics of event time for state TTL

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[Discuss] Semantics of event time for state TTL

Andrey Zagrebin-3
Hi All,

As you might have already seen there is an effort tracked in FLINK-12005
[1] to support event time scale for state with time-to-live (TTL) [2].
While thinking about design, we realised that there can be multiple options
for semantics of this feature, depending on use case. There is also
sometimes confusion because of event time out-of-order nature in Flink. I
am starting this thread to discuss potential use cases of this feature and
their requirements for interested users and developers. There was already
discussion thread asking about event time for TTL and it already contains
some thoughts [3].

There are two semantical cases where we use time for TTL feature at the
moment. Firstly, we store timestamp of state last access/update. Secondly,
we use this timestamp and current timestamp to check expiration and garbage
collect state at some point later.

At the moment, Flink supports *only processing time* for both timestamps:
state *last access and current timestamp*. It is basically current local
system unix epoch time.

When it comes to event time scale, we also need to define what Flink should
use for these two timestamps. Here I will list some options and their
possible pros&cons for discussion. There might be more depending on use
case.

*Last access timestamp (stored in backend with the actual state value):*

   - *Event timestamp of currently being processed record.* This seems to
   be the simplest option and it allows user-defined timestamps in state
   backend. The problem here might be instability of event time which can not
   only increase but also decrease if records come out of order. This can lead
   to rewriting the state timestamp to smaller value which is unnatural for
   the notion of time.
   - *Max event timestamp of records seen so far for this record key.* This
   option is similar to the previous one but it tries to fix the notion of
   time to make it always increasing. Maintaining this timestamp has also
   performance implications because the previous timestamp needs to be read
   out to decide whether to rewrite it.
   - *Last emitted watermark*. This is what we usually use for other
   operations to trigger some actions in Flink, like timers and windows but it
   can be unrelated to the record which actually triggers the state update.

*Current timestamp to check expiration:*

   - *Event timestamp of last processed record.* Again quite simple but
   unpredictable option for out-of-order events. It can potentially lead to
   undesirable expiration of late buffered data in state without control.
   - *Max event timestamp of records seen so far for operator backend.* Again
   similar to previous one, more stable but still user does not have too much
   control when to expire state.
   - *Last emitted watermark*. Again, this is what we usually use for other
   operations to trigger some actions in Flink, like timers and windows. It
   also gives user some control to decide when state is expired (up to which
   point in event time) by emitting certain watermark. It is more flexible but
   complicated. If some watermark emitting strategy is already used for other
   operations, it might be not optimal for TTL and delay state cleanup.
   - *Current processing time.* This option is quite simple, It would mean
   that user just decides which timestamp to store but it will expire in real
   time. For data privacy use case, it might be better because we want state
   to be unavailable in particular real moment of time since the associated
   piece of data was created in event time. For long term approximate garbage
   collection, it might be not a problem as well. For quick expiration, the
   time skew between event and processing time can lead again to premature
   deletion of late data and user cannot delay it.

We could also make this behaviour configurable. Another option is to make
time provider pluggable for users. The interface can give users context
(currently processed record, watermark etc) and ask them which timestamp to
use. This is more complicated though.

Looking forward for your feedback.

Best,
Andrey

[1] https://issues.apache.org/jira/browse/FLINK-12005
[2]
https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Elias Levy
My 2c:

Timestamp stored with the state value: Event timestamp
Timestamp used to check expiration: Last emitted watermark

That follows the event time processing model used elsewhere is Flink.  E.g.
events are segregated into windows based on their event time, but the
windows do not fire until the watermark advances past the end of the window.


On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]> wrote:

> Hi All,
>
> As you might have already seen there is an effort tracked in FLINK-12005
> [1] to support event time scale for state with time-to-live (TTL) [2].
> While thinking about design, we realised that there can be multiple options
> for semantics of this feature, depending on use case. There is also
> sometimes confusion because of event time out-of-order nature in Flink. I
> am starting this thread to discuss potential use cases of this feature and
> their requirements for interested users and developers. There was already
> discussion thread asking about event time for TTL and it already contains
> some thoughts [3].
>
> There are two semantical cases where we use time for TTL feature at the
> moment. Firstly, we store timestamp of state last access/update. Secondly,
> we use this timestamp and current timestamp to check expiration and garbage
> collect state at some point later.
>
> At the moment, Flink supports *only processing time* for both timestamps:
> state *last access and current timestamp*. It is basically current local
> system unix epoch time.
>
> When it comes to event time scale, we also need to define what Flink should
> use for these two timestamps. Here I will list some options and their
> possible pros&cons for discussion. There might be more depending on use
> case.
>
> *Last access timestamp (stored in backend with the actual state value):*
>
>    - *Event timestamp of currently being processed record.* This seems to
>    be the simplest option and it allows user-defined timestamps in state
>    backend. The problem here might be instability of event time which can
> not
>    only increase but also decrease if records come out of order. This can
> lead
>    to rewriting the state timestamp to smaller value which is unnatural for
>    the notion of time.
>    - *Max event timestamp of records seen so far for this record key.* This
>    option is similar to the previous one but it tries to fix the notion of
>    time to make it always increasing. Maintaining this timestamp has also
>    performance implications because the previous timestamp needs to be read
>    out to decide whether to rewrite it.
>    - *Last emitted watermark*. This is what we usually use for other
>    operations to trigger some actions in Flink, like timers and windows
> but it
>    can be unrelated to the record which actually triggers the state update.
>
> *Current timestamp to check expiration:*
>
>    - *Event timestamp of last processed record.* Again quite simple but
>    unpredictable option for out-of-order events. It can potentially lead to
>    undesirable expiration of late buffered data in state without control.
>    - *Max event timestamp of records seen so far for operator backend.*
> Again
>    similar to previous one, more stable but still user does not have too
> much
>    control when to expire state.
>    - *Last emitted watermark*. Again, this is what we usually use for other
>    operations to trigger some actions in Flink, like timers and windows. It
>    also gives user some control to decide when state is expired (up to
> which
>    point in event time) by emitting certain watermark. It is more flexible
> but
>    complicated. If some watermark emitting strategy is already used for
> other
>    operations, it might be not optimal for TTL and delay state cleanup.
>    - *Current processing time.* This option is quite simple, It would mean
>    that user just decides which timestamp to store but it will expire in
> real
>    time. For data privacy use case, it might be better because we want
> state
>    to be unavailable in particular real moment of time since the associated
>    piece of data was created in event time. For long term approximate
> garbage
>    collection, it might be not a problem as well. For quick expiration, the
>    time skew between event and processing time can lead again to premature
>    deletion of late data and user cannot delay it.
>
> We could also make this behaviour configurable. Another option is to make
> time provider pluggable for users. The interface can give users context
> (currently processed record, watermark etc) and ask them which timestamp to
> use. This is more complicated though.
>
> Looking forward for your feedback.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-12005
> [2]
>
> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
> [3]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Konstantin Knauf-3
Hi Andrey,

I agree with Elias. This would be the most natural behavior. I wouldn't add
additional slightly different notions of time to Flink.

As I can also see a use case for the combination

* Timestamp stored: Event timestamp
* Timestamp to check expiration: Processing Time

we could (maybe in a second step) add the possibility to mix and match time
characteristics for both aspects.

Cheers,

Konstantin

On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email]>
wrote:

> My 2c:
>
> Timestamp stored with the state value: Event timestamp
> Timestamp used to check expiration: Last emitted watermark
>
> That follows the event time processing model used elsewhere is Flink.
> E.g. events are segregated into windows based on their event time, but the
> windows do not fire until the watermark advances past the end of the window.
>
>
> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]>
> wrote:
>
>> Hi All,
>>
>> As you might have already seen there is an effort tracked in FLINK-12005
>> [1] to support event time scale for state with time-to-live (TTL) [2].
>> While thinking about design, we realised that there can be multiple
>> options
>> for semantics of this feature, depending on use case. There is also
>> sometimes confusion because of event time out-of-order nature in Flink. I
>> am starting this thread to discuss potential use cases of this feature and
>> their requirements for interested users and developers. There was already
>> discussion thread asking about event time for TTL and it already contains
>> some thoughts [3].
>>
>> There are two semantical cases where we use time for TTL feature at the
>> moment. Firstly, we store timestamp of state last access/update. Secondly,
>> we use this timestamp and current timestamp to check expiration and
>> garbage
>> collect state at some point later.
>>
>> At the moment, Flink supports *only processing time* for both timestamps:
>> state *last access and current timestamp*. It is basically current local
>> system unix epoch time.
>>
>> When it comes to event time scale, we also need to define what Flink
>> should
>> use for these two timestamps. Here I will list some options and their
>> possible pros&cons for discussion. There might be more depending on use
>> case.
>>
>> *Last access timestamp (stored in backend with the actual state value):*
>>
>>    - *Event timestamp of currently being processed record.* This seems to
>>    be the simplest option and it allows user-defined timestamps in state
>>    backend. The problem here might be instability of event time which can
>> not
>>    only increase but also decrease if records come out of order. This can
>> lead
>>    to rewriting the state timestamp to smaller value which is unnatural
>> for
>>    the notion of time.
>>    - *Max event timestamp of records seen so far for this record key.*
>> This
>>    option is similar to the previous one but it tries to fix the notion of
>>    time to make it always increasing. Maintaining this timestamp has also
>>    performance implications because the previous timestamp needs to be
>> read
>>    out to decide whether to rewrite it.
>>    - *Last emitted watermark*. This is what we usually use for other
>>    operations to trigger some actions in Flink, like timers and windows
>> but it
>>    can be unrelated to the record which actually triggers the state
>> update.
>>
>> *Current timestamp to check expiration:*
>>
>>    - *Event timestamp of last processed record.* Again quite simple but
>>    unpredictable option for out-of-order events. It can potentially lead
>> to
>>    undesirable expiration of late buffered data in state without control.
>>    - *Max event timestamp of records seen so far for operator backend.*
>> Again
>>    similar to previous one, more stable but still user does not have too
>> much
>>    control when to expire state.
>>    - *Last emitted watermark*. Again, this is what we usually use for
>> other
>>    operations to trigger some actions in Flink, like timers and windows.
>> It
>>    also gives user some control to decide when state is expired (up to
>> which
>>    point in event time) by emitting certain watermark. It is more
>> flexible but
>>    complicated. If some watermark emitting strategy is already used for
>> other
>>    operations, it might be not optimal for TTL and delay state cleanup.
>>    - *Current processing time.* This option is quite simple, It would mean
>>    that user just decides which timestamp to store but it will expire in
>> real
>>    time. For data privacy use case, it might be better because we want
>> state
>>    to be unavailable in particular real moment of time since the
>> associated
>>    piece of data was created in event time. For long term approximate
>> garbage
>>    collection, it might be not a problem as well. For quick expiration,
>> the
>>    time skew between event and processing time can lead again to premature
>>    deletion of late data and user cannot delay it.
>>
>> We could also make this behaviour configurable. Another option is to make
>> time provider pluggable for users. The interface can give users context
>> (currently processed record, watermark etc) and ask them which timestamp
>> to
>> use. This is more complicated though.
>>
>> Looking forward for your feedback.
>>
>> Best,
>> Andrey
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12005
>> [2]
>>
>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
>> [3]
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>>
>

--

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Aljoscha Krettek-2
Oh boy, this is an interesting pickle.

For *last-access-timestamp*, I think only *event-time-of-current-record* makes sense. I’m looking at this from a GDPR/regulatory compliance perspective. If you update a state, by say storing the event you just received in state, you want to use the exact timestamp of that event to to expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark* suffer from problems in edge cases: if the timestamp of an event you receive is quite a bit earlier than other timestamps that we have seen so far (i.e. the event is late) we would artificially lengthen the TTL of that event (which is stored in state) and would therefore break regulatory requirements. Always using the timestamp of an event doesn’t suffer from that problem.

For *expiration-check-time*, both *last-watermark* and *current-processing-time* could make sense but I’m leaning towards *processing-time*. The reason is again the GDPR/compliance view: if we have an old savepoint with data that should have been expired by now but we re-process it with *last-watermark* expiration, this means that we will get to “see” that state even though we shouldn’t allowed to be. If we use *current-processing-time* for expiration, we wouldn’t have that problem because that old data (according to their event-time timestamp) would be properly cleaned up and access would be prevented.

To sum up:
last-access-timestamp: event-time of event
expiration-check-time: processing-time
 
What do you think?

Aljoscha

> On 6. Apr 2019, at 01:30, Konstantin Knauf <[hidden email]> wrote:
>
> Hi Andrey,
>
> I agree with Elias. This would be the most natural behavior. I wouldn't add
> additional slightly different notions of time to Flink.
>
> As I can also see a use case for the combination
>
> * Timestamp stored: Event timestamp
> * Timestamp to check expiration: Processing Time
>
> we could (maybe in a second step) add the possibility to mix and match time
> characteristics for both aspects.
>
> Cheers,
>
> Konstantin
>
> On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email]>
> wrote:
>
>> My 2c:
>>
>> Timestamp stored with the state value: Event timestamp
>> Timestamp used to check expiration: Last emitted watermark
>>
>> That follows the event time processing model used elsewhere is Flink.
>> E.g. events are segregated into windows based on their event time, but the
>> windows do not fire until the watermark advances past the end of the window.
>>
>>
>> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]>
>> wrote:
>>
>>> Hi All,
>>>
>>> As you might have already seen there is an effort tracked in FLINK-12005
>>> [1] to support event time scale for state with time-to-live (TTL) [2].
>>> While thinking about design, we realised that there can be multiple
>>> options
>>> for semantics of this feature, depending on use case. There is also
>>> sometimes confusion because of event time out-of-order nature in Flink. I
>>> am starting this thread to discuss potential use cases of this feature and
>>> their requirements for interested users and developers. There was already
>>> discussion thread asking about event time for TTL and it already contains
>>> some thoughts [3].
>>>
>>> There are two semantical cases where we use time for TTL feature at the
>>> moment. Firstly, we store timestamp of state last access/update. Secondly,
>>> we use this timestamp and current timestamp to check expiration and
>>> garbage
>>> collect state at some point later.
>>>
>>> At the moment, Flink supports *only processing time* for both timestamps:
>>> state *last access and current timestamp*. It is basically current local
>>> system unix epoch time.
>>>
>>> When it comes to event time scale, we also need to define what Flink
>>> should
>>> use for these two timestamps. Here I will list some options and their
>>> possible pros&cons for discussion. There might be more depending on use
>>> case.
>>>
>>> *Last access timestamp (stored in backend with the actual state value):*
>>>
>>>   - *Event timestamp of currently being processed record.* This seems to
>>>   be the simplest option and it allows user-defined timestamps in state
>>>   backend. The problem here might be instability of event time which can
>>> not
>>>   only increase but also decrease if records come out of order. This can
>>> lead
>>>   to rewriting the state timestamp to smaller value which is unnatural
>>> for
>>>   the notion of time.
>>>   - *Max event timestamp of records seen so far for this record key.*
>>> This
>>>   option is similar to the previous one but it tries to fix the notion of
>>>   time to make it always increasing. Maintaining this timestamp has also
>>>   performance implications because the previous timestamp needs to be
>>> read
>>>   out to decide whether to rewrite it.
>>>   - *Last emitted watermark*. This is what we usually use for other
>>>   operations to trigger some actions in Flink, like timers and windows
>>> but it
>>>   can be unrelated to the record which actually triggers the state
>>> update.
>>>
>>> *Current timestamp to check expiration:*
>>>
>>>   - *Event timestamp of last processed record.* Again quite simple but
>>>   unpredictable option for out-of-order events. It can potentially lead
>>> to
>>>   undesirable expiration of late buffered data in state without control.
>>>   - *Max event timestamp of records seen so far for operator backend.*
>>> Again
>>>   similar to previous one, more stable but still user does not have too
>>> much
>>>   control when to expire state.
>>>   - *Last emitted watermark*. Again, this is what we usually use for
>>> other
>>>   operations to trigger some actions in Flink, like timers and windows.
>>> It
>>>   also gives user some control to decide when state is expired (up to
>>> which
>>>   point in event time) by emitting certain watermark. It is more
>>> flexible but
>>>   complicated. If some watermark emitting strategy is already used for
>>> other
>>>   operations, it might be not optimal for TTL and delay state cleanup.
>>>   - *Current processing time.* This option is quite simple, It would mean
>>>   that user just decides which timestamp to store but it will expire in
>>> real
>>>   time. For data privacy use case, it might be better because we want
>>> state
>>>   to be unavailable in particular real moment of time since the
>>> associated
>>>   piece of data was created in event time. For long term approximate
>>> garbage
>>>   collection, it might be not a problem as well. For quick expiration,
>>> the
>>>   time skew between event and processing time can lead again to premature
>>>   deletion of late data and user cannot delay it.
>>>
>>> We could also make this behaviour configurable. Another option is to make
>>> time provider pluggable for users. The interface can give users context
>>> (currently processed record, watermark etc) and ask them which timestamp
>>> to
>>> use. This is more complicated though.
>>>
>>> Looking forward for your feedback.
>>>
>>> Best,
>>> Andrey
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-12005
>>> [2]
>>>
>>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
>>> [3]
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Kostas Kloudas-4
Hi all,

For GDPR: I am not sure about the regulatory requirements of GDPR but I
would assume that the time for deletion starts counting from the time an
organisation received the data (i.e. the wall-clock ingestion time of the
data), and not the "event time" of the data. In other case, an organisaton
may be violating GDPR by just receiving e.g. 1 year old data of a user
whole deletion policy is "you are allowed to keep them for 6 months".

Now for the discussion in this thread, I think that the scenario:

* Timestamp stored: Event timestamp
* Timestamp to check expiration: Processing Time

has the underlying assumption that there is a relationship between
event-time and processing time, which is not necessarily the case.
Event-time, although we call it "time", is just another user-defined column
or attribute of the data and can be anything. It is not an "objective" and
independently evolving attribute like wall-clock time. I am not sure what
could be the solution, as out-of-orderness can always lead to arbitrary,
non-reproducible and difficult to debug behaviour (e.g. a super-early
element that arrives out-of-order and, as the succeeding elements set the
timestamp to lower values, it gets deleted by the state backend, although
the user-level windowing logic would expect it to be there).

Given that last point made above, and apart from the semantics of the
proposed feature, I think that we should also discuss if it is a good idea
to have event time TTL implemented in state backend level in the first
place. Personally, I am not so convinced that this is a good idea, as we
introduce another (potentially competing) mechanism for handling event
time, apart from the user program. An example can be the one that I
described above. And this also defeats one of the main advantages of event
time, in my opinion, which is reproducability of the results.

I may be wrong, but I would appreciate any opinions on this.

Cheers,
Kostas

On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek <[hidden email]>
wrote:

> Oh boy, this is an interesting pickle.
>
> For *last-access-timestamp*, I think only *event-time-of-current-record*
> makes sense. I’m looking at this from a GDPR/regulatory compliance
> perspective. If you update a state, by say storing the event you just
> received in state, you want to use the exact timestamp of that event to to
> expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark*
> suffer from problems in edge cases: if the timestamp of an event you
> receive is quite a bit earlier than other timestamps that we have seen so
> far (i.e. the event is late) we would artificially lengthen the TTL of that
> event (which is stored in state) and would therefore break regulatory
> requirements. Always using the timestamp of an event doesn’t suffer from
> that problem.
>
> For *expiration-check-time*, both *last-watermark* and
> *current-processing-time* could make sense but I’m leaning towards
> *processing-time*. The reason is again the GDPR/compliance view: if we have
> an old savepoint with data that should have been expired by now but we
> re-process it with *last-watermark* expiration, this means that we will get
> to “see” that state even though we shouldn’t allowed to be. If we use
> *current-processing-time* for expiration, we wouldn’t have that problem
> because that old data (according to their event-time timestamp) would be
> properly cleaned up and access would be prevented.
>
> To sum up:
> last-access-timestamp: event-time of event
> expiration-check-time: processing-time
>
> What do you think?
>
> Aljoscha
>
> > On 6. Apr 2019, at 01:30, Konstantin Knauf <[hidden email]>
> wrote:
> >
> > Hi Andrey,
> >
> > I agree with Elias. This would be the most natural behavior. I wouldn't
> add
> > additional slightly different notions of time to Flink.
> >
> > As I can also see a use case for the combination
> >
> > * Timestamp stored: Event timestamp
> > * Timestamp to check expiration: Processing Time
> >
> > we could (maybe in a second step) add the possibility to mix and match
> time
> > characteristics for both aspects.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email]>
> > wrote:
> >
> >> My 2c:
> >>
> >> Timestamp stored with the state value: Event timestamp
> >> Timestamp used to check expiration: Last emitted watermark
> >>
> >> That follows the event time processing model used elsewhere is Flink.
> >> E.g. events are segregated into windows based on their event time, but
> the
> >> windows do not fire until the watermark advances past the end of the
> window.
> >>
> >>
> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]>
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> As you might have already seen there is an effort tracked in
> FLINK-12005
> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
> >>> While thinking about design, we realised that there can be multiple
> >>> options
> >>> for semantics of this feature, depending on use case. There is also
> >>> sometimes confusion because of event time out-of-order nature in
> Flink. I
> >>> am starting this thread to discuss potential use cases of this feature
> and
> >>> their requirements for interested users and developers. There was
> already
> >>> discussion thread asking about event time for TTL and it already
> contains
> >>> some thoughts [3].
> >>>
> >>> There are two semantical cases where we use time for TTL feature at the
> >>> moment. Firstly, we store timestamp of state last access/update.
> Secondly,
> >>> we use this timestamp and current timestamp to check expiration and
> >>> garbage
> >>> collect state at some point later.
> >>>
> >>> At the moment, Flink supports *only processing time* for both
> timestamps:
> >>> state *last access and current timestamp*. It is basically current
> local
> >>> system unix epoch time.
> >>>
> >>> When it comes to event time scale, we also need to define what Flink
> >>> should
> >>> use for these two timestamps. Here I will list some options and their
> >>> possible pros&cons for discussion. There might be more depending on use
> >>> case.
> >>>
> >>> *Last access timestamp (stored in backend with the actual state
> value):*
> >>>
> >>>   - *Event timestamp of currently being processed record.* This seems
> to
> >>>   be the simplest option and it allows user-defined timestamps in state
> >>>   backend. The problem here might be instability of event time which
> can
> >>> not
> >>>   only increase but also decrease if records come out of order. This
> can
> >>> lead
> >>>   to rewriting the state timestamp to smaller value which is unnatural
> >>> for
> >>>   the notion of time.
> >>>   - *Max event timestamp of records seen so far for this record key.*
> >>> This
> >>>   option is similar to the previous one but it tries to fix the notion
> of
> >>>   time to make it always increasing. Maintaining this timestamp has
> also
> >>>   performance implications because the previous timestamp needs to be
> >>> read
> >>>   out to decide whether to rewrite it.
> >>>   - *Last emitted watermark*. This is what we usually use for other
> >>>   operations to trigger some actions in Flink, like timers and windows
> >>> but it
> >>>   can be unrelated to the record which actually triggers the state
> >>> update.
> >>>
> >>> *Current timestamp to check expiration:*
> >>>
> >>>   - *Event timestamp of last processed record.* Again quite simple but
> >>>   unpredictable option for out-of-order events. It can potentially lead
> >>> to
> >>>   undesirable expiration of late buffered data in state without
> control.
> >>>   - *Max event timestamp of records seen so far for operator backend.*
> >>> Again
> >>>   similar to previous one, more stable but still user does not have too
> >>> much
> >>>   control when to expire state.
> >>>   - *Last emitted watermark*. Again, this is what we usually use for
> >>> other
> >>>   operations to trigger some actions in Flink, like timers and windows.
> >>> It
> >>>   also gives user some control to decide when state is expired (up to
> >>> which
> >>>   point in event time) by emitting certain watermark. It is more
> >>> flexible but
> >>>   complicated. If some watermark emitting strategy is already used for
> >>> other
> >>>   operations, it might be not optimal for TTL and delay state cleanup.
> >>>   - *Current processing time.* This option is quite simple, It would
> mean
> >>>   that user just decides which timestamp to store but it will expire in
> >>> real
> >>>   time. For data privacy use case, it might be better because we want
> >>> state
> >>>   to be unavailable in particular real moment of time since the
> >>> associated
> >>>   piece of data was created in event time. For long term approximate
> >>> garbage
> >>>   collection, it might be not a problem as well. For quick expiration,
> >>> the
> >>>   time skew between event and processing time can lead again to
> premature
> >>>   deletion of late data and user cannot delay it.
> >>>
> >>> We could also make this behaviour configurable. Another option is to
> make
> >>> time provider pluggable for users. The interface can give users context
> >>> (currently processed record, watermark etc) and ask them which
> timestamp
> >>> to
> >>> use. This is more complicated though.
> >>>
> >>> Looking forward for your feedback.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-12005
> >>> [2]
> >>>
> >>>
> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
> >>> [3]
> >>>
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
> >>>
> >>
> >
> > --
> >
> > Konstantin Knauf | Solutions Architect
> >
> > +49 160 91394525
> >
> > <https://www.ververica.com/>
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Data Artisans GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Aljoscha Krettek-2
I had a discussion with Andrey and now think that also the case event-time-timestamp/watermark-cleanup is a valid case. If you don’t need this for regulatory compliance but just for cleaning up old state, in case where you have re-processing of old data.

I think the discussion about whether to have this in the backends is also good to have: I’d say it’s good to have it in the backends because this
 (1) decreases state size, for user timers a timer entry is basically a <key, timestamp> whereas if we use backend TTL it’s only the timestamp
 (2) can piggyback on log compaction in RocksDB. A user-time manually has to go to state and delete it, which can be costly, while TTL in the backend would happen as-we-go

Aljoscha

> On 8. Apr 2019, at 12:03, Kostas Kloudas <[hidden email]> wrote:
>
> Hi all,
>
> For GDPR: I am not sure about the regulatory requirements of GDPR but I would assume that the time for deletion starts counting from the time an organisation received the data (i.e. the wall-clock ingestion time of the data), and not the "event time" of the data. In other case, an organisaton may be violating GDPR by just receiving e.g. 1 year old data of a user whole deletion policy is "you are allowed to keep them for 6 months".
>
> Now for the discussion in this thread, I think that the scenario:
>
> * Timestamp stored: Event timestamp
> * Timestamp to check expiration: Processing Time
>
> has the underlying assumption that there is a relationship between event-time and processing time, which is not necessarily the case. Event-time, although we call it "time", is just another user-defined column or attribute of the data and can be anything. It is not an "objective" and independently evolving attribute like wall-clock time. I am not sure what could be the solution, as out-of-orderness can always lead to arbitrary, non-reproducible and difficult to debug behaviour (e.g. a super-early element that arrives out-of-order and, as the succeeding elements set the timestamp to lower values, it gets deleted by the state backend, although the user-level windowing logic would expect it to be there).
>
> Given that last point made above, and apart from the semantics of the proposed feature, I think that we should also discuss if it is a good idea to have event time TTL implemented in state backend level in the first place. Personally, I am not so convinced that this is a good idea, as we introduce another (potentially competing) mechanism for handling event time, apart from the user program. An example can be the one that I described above. And this also defeats one of the main advantages of event time, in my opinion, which is reproducability of the results.
>
> I may be wrong, but I would appreciate any opinions on this.
>
> Cheers,
> Kostas
>
> On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek <[hidden email] <mailto:[hidden email]>> wrote:
> Oh boy, this is an interesting pickle.
>
> For *last-access-timestamp*, I think only *event-time-of-current-record* makes sense. I’m looking at this from a GDPR/regulatory compliance perspective. If you update a state, by say storing the event you just received in state, you want to use the exact timestamp of that event to to expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark* suffer from problems in edge cases: if the timestamp of an event you receive is quite a bit earlier than other timestamps that we have seen so far (i.e. the event is late) we would artificially lengthen the TTL of that event (which is stored in state) and would therefore break regulatory requirements. Always using the timestamp of an event doesn’t suffer from that problem.
>
> For *expiration-check-time*, both *last-watermark* and *current-processing-time* could make sense but I’m leaning towards *processing-time*. The reason is again the GDPR/compliance view: if we have an old savepoint with data that should have been expired by now but we re-process it with *last-watermark* expiration, this means that we will get to “see” that state even though we shouldn’t allowed to be. If we use *current-processing-time* for expiration, we wouldn’t have that problem because that old data (according to their event-time timestamp) would be properly cleaned up and access would be prevented.
>
> To sum up:
> last-access-timestamp: event-time of event
> expiration-check-time: processing-time
>
> What do you think?
>
> Aljoscha
>
> > On 6. Apr 2019, at 01:30, Konstantin Knauf <[hidden email] <mailto:[hidden email]>> wrote:
> >
> > Hi Andrey,
> >
> > I agree with Elias. This would be the most natural behavior. I wouldn't add
> > additional slightly different notions of time to Flink.
> >
> > As I can also see a use case for the combination
> >
> > * Timestamp stored: Event timestamp
> > * Timestamp to check expiration: Processing Time
> >
> > we could (maybe in a second step) add the possibility to mix and match time
> > characteristics for both aspects.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email] <mailto:[hidden email]>>
> > wrote:
> >
> >> My 2c:
> >>
> >> Timestamp stored with the state value: Event timestamp
> >> Timestamp used to check expiration: Last emitted watermark
> >>
> >> That follows the event time processing model used elsewhere is Flink.
> >> E.g. events are segregated into windows based on their event time, but the
> >> windows do not fire until the watermark advances past the end of the window.
> >>
> >>
> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email] <mailto:[hidden email]>>
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> As you might have already seen there is an effort tracked in FLINK-12005
> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
> >>> While thinking about design, we realised that there can be multiple
> >>> options
> >>> for semantics of this feature, depending on use case. There is also
> >>> sometimes confusion because of event time out-of-order nature in Flink. I
> >>> am starting this thread to discuss potential use cases of this feature and
> >>> their requirements for interested users and developers. There was already
> >>> discussion thread asking about event time for TTL and it already contains
> >>> some thoughts [3].
> >>>
> >>> There are two semantical cases where we use time for TTL feature at the
> >>> moment. Firstly, we store timestamp of state last access/update. Secondly,
> >>> we use this timestamp and current timestamp to check expiration and
> >>> garbage
> >>> collect state at some point later.
> >>>
> >>> At the moment, Flink supports *only processing time* for both timestamps:
> >>> state *last access and current timestamp*. It is basically current local
> >>> system unix epoch time.
> >>>
> >>> When it comes to event time scale, we also need to define what Flink
> >>> should
> >>> use for these two timestamps. Here I will list some options and their
> >>> possible pros&cons for discussion. There might be more depending on use
> >>> case.
> >>>
> >>> *Last access timestamp (stored in backend with the actual state value):*
> >>>
> >>>   - *Event timestamp of currently being processed record.* This seems to
> >>>   be the simplest option and it allows user-defined timestamps in state
> >>>   backend. The problem here might be instability of event time which can
> >>> not
> >>>   only increase but also decrease if records come out of order. This can
> >>> lead
> >>>   to rewriting the state timestamp to smaller value which is unnatural
> >>> for
> >>>   the notion of time.
> >>>   - *Max event timestamp of records seen so far for this record key.*
> >>> This
> >>>   option is similar to the previous one but it tries to fix the notion of
> >>>   time to make it always increasing. Maintaining this timestamp has also
> >>>   performance implications because the previous timestamp needs to be
> >>> read
> >>>   out to decide whether to rewrite it.
> >>>   - *Last emitted watermark*. This is what we usually use for other
> >>>   operations to trigger some actions in Flink, like timers and windows
> >>> but it
> >>>   can be unrelated to the record which actually triggers the state
> >>> update.
> >>>
> >>> *Current timestamp to check expiration:*
> >>>
> >>>   - *Event timestamp of last processed record.* Again quite simple but
> >>>   unpredictable option for out-of-order events. It can potentially lead
> >>> to
> >>>   undesirable expiration of late buffered data in state without control.
> >>>   - *Max event timestamp of records seen so far for operator backend.*
> >>> Again
> >>>   similar to previous one, more stable but still user does not have too
> >>> much
> >>>   control when to expire state.
> >>>   - *Last emitted watermark*. Again, this is what we usually use for
> >>> other
> >>>   operations to trigger some actions in Flink, like timers and windows.
> >>> It
> >>>   also gives user some control to decide when state is expired (up to
> >>> which
> >>>   point in event time) by emitting certain watermark. It is more
> >>> flexible but
> >>>   complicated. If some watermark emitting strategy is already used for
> >>> other
> >>>   operations, it might be not optimal for TTL and delay state cleanup.
> >>>   - *Current processing time.* This option is quite simple, It would mean
> >>>   that user just decides which timestamp to store but it will expire in
> >>> real
> >>>   time. For data privacy use case, it might be better because we want
> >>> state
> >>>   to be unavailable in particular real moment of time since the
> >>> associated
> >>>   piece of data was created in event time. For long term approximate
> >>> garbage
> >>>   collection, it might be not a problem as well. For quick expiration,
> >>> the
> >>>   time skew between event and processing time can lead again to premature
> >>>   deletion of late data and user cannot delay it.
> >>>
> >>> We could also make this behaviour configurable. Another option is to make
> >>> time provider pluggable for users. The interface can give users context
> >>> (currently processed record, watermark etc) and ask them which timestamp
> >>> to
> >>> use. This is more complicated though.
> >>>
> >>> Looking forward for your feedback.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-12005 <https://issues.apache.org/jira/browse/FLINK-12005>
> >>> [2]
> >>>
> >>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM <https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
> >>> [3]
> >>>
> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html>
> >>>
> >>
> >
> > --
> >
> > Konstantin Knauf | Solutions Architect
> >
> > +49 160 91394525
> >
> > <https://www.ververica.com/ <https://www.ververica.com/>>
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/ <https://flink-forward.org/>> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Data Artisans GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Elias Levy
Hasn't this been always the end goal?  It's certainly what we have been
waiting on for job with very large TTLed state.  Beyond timer storage,
timer processing to simply expire stale data that may not be accessed
otherwise is expensive.

On Mon, Apr 8, 2019 at 7:11 AM Aljoscha Krettek <[hidden email]> wrote:

> I had a discussion with Andrey and now think that also the case
> event-time-timestamp/watermark-cleanup is a valid case. If you don’t need
> this for regulatory compliance but just for cleaning up old state, in case
> where you have re-processing of old data.
>
> I think the discussion about whether to have this in the backends is also
> good to have: I’d say it’s good to have it in the backends because this
>  (1) decreases state size, for user timers a timer entry is basically a
> <key, timestamp> whereas if we use backend TTL it’s only the timestamp
>  (2) can piggyback on log compaction in RocksDB. A user-time manually has
> to go to state and delete it, which can be costly, while TTL in the backend
> would happen as-we-go
>
> Aljoscha
>
> On 8. Apr 2019, at 12:03, Kostas Kloudas <[hidden email]> wrote:
>
> Hi all,
>
> For GDPR: I am not sure about the regulatory requirements of GDPR but I
> would assume that the time for deletion starts counting from the time an
> organisation received the data (i.e. the wall-clock ingestion time of the
> data), and not the "event time" of the data. In other case, an organisaton
> may be violating GDPR by just receiving e.g. 1 year old data of a user
> whole deletion policy is "you are allowed to keep them for 6 months".
>
> Now for the discussion in this thread, I think that the scenario:
>
> * Timestamp stored: Event timestamp
> * Timestamp to check expiration: Processing Time
>
> has the underlying assumption that there is a relationship between
> event-time and processing time, which is not necessarily the case.
> Event-time, although we call it "time", is just another user-defined column
> or attribute of the data and can be anything. It is not an "objective" and
> independently evolving attribute like wall-clock time. I am not sure what
> could be the solution, as out-of-orderness can always lead to arbitrary,
> non-reproducible and difficult to debug behaviour (e.g. a super-early
> element that arrives out-of-order and, as the succeeding elements set the
> timestamp to lower values, it gets deleted by the state backend, although
> the user-level windowing logic would expect it to be there).
>
> Given that last point made above, and apart from the semantics of the
> proposed feature, I think that we should also discuss if it is a good idea
> to have event time TTL implemented in state backend level in the first
> place. Personally, I am not so convinced that this is a good idea, as we
> introduce another (potentially competing) mechanism for handling event
> time, apart from the user program. An example can be the one that I
> described above. And this also defeats one of the main advantages of event
> time, in my opinion, which is reproducability of the results.
>
> I may be wrong, but I would appreciate any opinions on this.
>
> Cheers,
> Kostas
>
> On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Oh boy, this is an interesting pickle.
>>
>> For *last-access-timestamp*, I think only *event-time-of-current-record*
>> makes sense. I’m looking at this from a GDPR/regulatory compliance
>> perspective. If you update a state, by say storing the event you just
>> received in state, you want to use the exact timestamp of that event to to
>> expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark*
>> suffer from problems in edge cases: if the timestamp of an event you
>> receive is quite a bit earlier than other timestamps that we have seen so
>> far (i.e. the event is late) we would artificially lengthen the TTL of that
>> event (which is stored in state) and would therefore break regulatory
>> requirements. Always using the timestamp of an event doesn’t suffer from
>> that problem.
>>
>> For *expiration-check-time*, both *last-watermark* and
>> *current-processing-time* could make sense but I’m leaning towards
>> *processing-time*. The reason is again the GDPR/compliance view: if we have
>> an old savepoint with data that should have been expired by now but we
>> re-process it with *last-watermark* expiration, this means that we will get
>> to “see” that state even though we shouldn’t allowed to be. If we use
>> *current-processing-time* for expiration, we wouldn’t have that problem
>> because that old data (according to their event-time timestamp) would be
>> properly cleaned up and access would be prevented.
>>
>> To sum up:
>> last-access-timestamp: event-time of event
>> expiration-check-time: processing-time
>>
>> What do you think?
>>
>> Aljoscha
>>
>> > On 6. Apr 2019, at 01:30, Konstantin Knauf <[hidden email]>
>> wrote:
>> >
>> > Hi Andrey,
>> >
>> > I agree with Elias. This would be the most natural behavior. I wouldn't
>> add
>> > additional slightly different notions of time to Flink.
>> >
>> > As I can also see a use case for the combination
>> >
>> > * Timestamp stored: Event timestamp
>> > * Timestamp to check expiration: Processing Time
>> >
>> > we could (maybe in a second step) add the possibility to mix and match
>> time
>> > characteristics for both aspects.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email]>
>> > wrote:
>> >
>> >> My 2c:
>> >>
>> >> Timestamp stored with the state value: Event timestamp
>> >> Timestamp used to check expiration: Last emitted watermark
>> >>
>> >> That follows the event time processing model used elsewhere is Flink.
>> >> E.g. events are segregated into windows based on their event time, but
>> the
>> >> windows do not fire until the watermark advances past the end of the
>> window.
>> >>
>> >>
>> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]>
>> >> wrote:
>> >>
>> >>> Hi All,
>> >>>
>> >>> As you might have already seen there is an effort tracked in
>> FLINK-12005
>> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
>> >>> While thinking about design, we realised that there can be multiple
>> >>> options
>> >>> for semantics of this feature, depending on use case. There is also
>> >>> sometimes confusion because of event time out-of-order nature in
>> Flink. I
>> >>> am starting this thread to discuss potential use cases of this
>> feature and
>> >>> their requirements for interested users and developers. There was
>> already
>> >>> discussion thread asking about event time for TTL and it already
>> contains
>> >>> some thoughts [3].
>> >>>
>> >>> There are two semantical cases where we use time for TTL feature at
>> the
>> >>> moment. Firstly, we store timestamp of state last access/update.
>> Secondly,
>> >>> we use this timestamp and current timestamp to check expiration and
>> >>> garbage
>> >>> collect state at some point later.
>> >>>
>> >>> At the moment, Flink supports *only processing time* for both
>> timestamps:
>> >>> state *last access and current timestamp*. It is basically current
>> local
>> >>> system unix epoch time.
>> >>>
>> >>> When it comes to event time scale, we also need to define what Flink
>> >>> should
>> >>> use for these two timestamps. Here I will list some options and their
>> >>> possible pros&cons for discussion. There might be more depending on
>> use
>> >>> case.
>> >>>
>> >>> *Last access timestamp (stored in backend with the actual state
>> value):*
>> >>>
>> >>>   - *Event timestamp of currently being processed record.* This seems
>> to
>> >>>   be the simplest option and it allows user-defined timestamps in
>> state
>> >>>   backend. The problem here might be instability of event time which
>> can
>> >>> not
>> >>>   only increase but also decrease if records come out of order. This
>> can
>> >>> lead
>> >>>   to rewriting the state timestamp to smaller value which is unnatural
>> >>> for
>> >>>   the notion of time.
>> >>>   - *Max event timestamp of records seen so far for this record key.*
>> >>> This
>> >>>   option is similar to the previous one but it tries to fix the
>> notion of
>> >>>   time to make it always increasing. Maintaining this timestamp has
>> also
>> >>>   performance implications because the previous timestamp needs to be
>> >>> read
>> >>>   out to decide whether to rewrite it.
>> >>>   - *Last emitted watermark*. This is what we usually use for other
>> >>>   operations to trigger some actions in Flink, like timers and windows
>> >>> but it
>> >>>   can be unrelated to the record which actually triggers the state
>> >>> update.
>> >>>
>> >>> *Current timestamp to check expiration:*
>> >>>
>> >>>   - *Event timestamp of last processed record.* Again quite simple but
>> >>>   unpredictable option for out-of-order events. It can potentially
>> lead
>> >>> to
>> >>>   undesirable expiration of late buffered data in state without
>> control.
>> >>>   - *Max event timestamp of records seen so far for operator backend.*
>> >>> Again
>> >>>   similar to previous one, more stable but still user does not have
>> too
>> >>> much
>> >>>   control when to expire state.
>> >>>   - *Last emitted watermark*. Again, this is what we usually use for
>> >>> other
>> >>>   operations to trigger some actions in Flink, like timers and
>> windows.
>> >>> It
>> >>>   also gives user some control to decide when state is expired (up to
>> >>> which
>> >>>   point in event time) by emitting certain watermark. It is more
>> >>> flexible but
>> >>>   complicated. If some watermark emitting strategy is already used for
>> >>> other
>> >>>   operations, it might be not optimal for TTL and delay state cleanup.
>> >>>   - *Current processing time.* This option is quite simple, It would
>> mean
>> >>>   that user just decides which timestamp to store but it will expire
>> in
>> >>> real
>> >>>   time. For data privacy use case, it might be better because we want
>> >>> state
>> >>>   to be unavailable in particular real moment of time since the
>> >>> associated
>> >>>   piece of data was created in event time. For long term approximate
>> >>> garbage
>> >>>   collection, it might be not a problem as well. For quick expiration,
>> >>> the
>> >>>   time skew between event and processing time can lead again to
>> premature
>> >>>   deletion of late data and user cannot delay it.
>> >>>
>> >>> We could also make this behaviour configurable. Another option is to
>> make
>> >>> time provider pluggable for users. The interface can give users
>> context
>> >>> (currently processed record, watermark etc) and ask them which
>> timestamp
>> >>> to
>> >>> use. This is more complicated though.
>> >>>
>> >>> Looking forward for your feedback.
>> >>>
>> >>> Best,
>> >>> Andrey
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-12005
>> >>> [2]
>> >>>
>> >>>
>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
>> >>> [3]
>> >>>
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>> >>>
>> >>
>> >
>> > --
>> >
>> > Konstantin Knauf | Solutions Architect
>> >
>> > +49 160 91394525
>> >
>> > <https://www.ververica.com/>
>> >
>> > Follow us @VervericaData
>> >
>> > --
>> >
>> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> > Conference
>> >
>> > Stream Processing | Event Driven | Real Time
>> >
>> > --
>> >
>> > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >
>> > --
>> > Data Artisans GmbH
>> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discuss] Semantics of event time for state TTL

Aljoscha Krettek-2
I think so, I just wanted to bring it up again because the question was raised.

> On 8. Apr 2019, at 22:56, Elias Levy <[hidden email]> wrote:
>
> Hasn't this been always the end goal?  It's certainly what we have been
> waiting on for job with very large TTLed state.  Beyond timer storage,
> timer processing to simply expire stale data that may not be accessed
> otherwise is expensive.
>
> On Mon, Apr 8, 2019 at 7:11 AM Aljoscha Krettek <[hidden email]> wrote:
>
>> I had a discussion with Andrey and now think that also the case
>> event-time-timestamp/watermark-cleanup is a valid case. If you don’t need
>> this for regulatory compliance but just for cleaning up old state, in case
>> where you have re-processing of old data.
>>
>> I think the discussion about whether to have this in the backends is also
>> good to have: I’d say it’s good to have it in the backends because this
>> (1) decreases state size, for user timers a timer entry is basically a
>> <key, timestamp> whereas if we use backend TTL it’s only the timestamp
>> (2) can piggyback on log compaction in RocksDB. A user-time manually has
>> to go to state and delete it, which can be costly, while TTL in the backend
>> would happen as-we-go
>>
>> Aljoscha
>>
>> On 8. Apr 2019, at 12:03, Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi all,
>>
>> For GDPR: I am not sure about the regulatory requirements of GDPR but I
>> would assume that the time for deletion starts counting from the time an
>> organisation received the data (i.e. the wall-clock ingestion time of the
>> data), and not the "event time" of the data. In other case, an organisaton
>> may be violating GDPR by just receiving e.g. 1 year old data of a user
>> whole deletion policy is "you are allowed to keep them for 6 months".
>>
>> Now for the discussion in this thread, I think that the scenario:
>>
>> * Timestamp stored: Event timestamp
>> * Timestamp to check expiration: Processing Time
>>
>> has the underlying assumption that there is a relationship between
>> event-time and processing time, which is not necessarily the case.
>> Event-time, although we call it "time", is just another user-defined column
>> or attribute of the data and can be anything. It is not an "objective" and
>> independently evolving attribute like wall-clock time. I am not sure what
>> could be the solution, as out-of-orderness can always lead to arbitrary,
>> non-reproducible and difficult to debug behaviour (e.g. a super-early
>> element that arrives out-of-order and, as the succeeding elements set the
>> timestamp to lower values, it gets deleted by the state backend, although
>> the user-level windowing logic would expect it to be there).
>>
>> Given that last point made above, and apart from the semantics of the
>> proposed feature, I think that we should also discuss if it is a good idea
>> to have event time TTL implemented in state backend level in the first
>> place. Personally, I am not so convinced that this is a good idea, as we
>> introduce another (potentially competing) mechanism for handling event
>> time, apart from the user program. An example can be the one that I
>> described above. And this also defeats one of the main advantages of event
>> time, in my opinion, which is reproducability of the results.
>>
>> I may be wrong, but I would appreciate any opinions on this.
>>
>> Cheers,
>> Kostas
>>
>> On Mon, Apr 8, 2019 at 11:12 AM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Oh boy, this is an interesting pickle.
>>>
>>> For *last-access-timestamp*, I think only *event-time-of-current-record*
>>> makes sense. I’m looking at this from a GDPR/regulatory compliance
>>> perspective. If you update a state, by say storing the event you just
>>> received in state, you want to use the exact timestamp of that event to to
>>> expiration. Both *max-timestamp-of-data-seen-so-far* and *last-watermark*
>>> suffer from problems in edge cases: if the timestamp of an event you
>>> receive is quite a bit earlier than other timestamps that we have seen so
>>> far (i.e. the event is late) we would artificially lengthen the TTL of that
>>> event (which is stored in state) and would therefore break regulatory
>>> requirements. Always using the timestamp of an event doesn’t suffer from
>>> that problem.
>>>
>>> For *expiration-check-time*, both *last-watermark* and
>>> *current-processing-time* could make sense but I’m leaning towards
>>> *processing-time*. The reason is again the GDPR/compliance view: if we have
>>> an old savepoint with data that should have been expired by now but we
>>> re-process it with *last-watermark* expiration, this means that we will get
>>> to “see” that state even though we shouldn’t allowed to be. If we use
>>> *current-processing-time* for expiration, we wouldn’t have that problem
>>> because that old data (according to their event-time timestamp) would be
>>> properly cleaned up and access would be prevented.
>>>
>>> To sum up:
>>> last-access-timestamp: event-time of event
>>> expiration-check-time: processing-time
>>>
>>> What do you think?
>>>
>>> Aljoscha
>>>
>>>> On 6. Apr 2019, at 01:30, Konstantin Knauf <[hidden email]>
>>> wrote:
>>>>
>>>> Hi Andrey,
>>>>
>>>> I agree with Elias. This would be the most natural behavior. I wouldn't
>>> add
>>>> additional slightly different notions of time to Flink.
>>>>
>>>> As I can also see a use case for the combination
>>>>
>>>> * Timestamp stored: Event timestamp
>>>> * Timestamp to check expiration: Processing Time
>>>>
>>>> we could (maybe in a second step) add the possibility to mix and match
>>> time
>>>> characteristics for both aspects.
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> On Thu, Apr 4, 2019 at 7:59 PM Elias Levy <[hidden email]>
>>>> wrote:
>>>>
>>>>> My 2c:
>>>>>
>>>>> Timestamp stored with the state value: Event timestamp
>>>>> Timestamp used to check expiration: Last emitted watermark
>>>>>
>>>>> That follows the event time processing model used elsewhere is Flink.
>>>>> E.g. events are segregated into windows based on their event time, but
>>> the
>>>>> windows do not fire until the watermark advances past the end of the
>>> window.
>>>>>
>>>>>
>>>>> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> As you might have already seen there is an effort tracked in
>>> FLINK-12005
>>>>>> [1] to support event time scale for state with time-to-live (TTL) [2].
>>>>>> While thinking about design, we realised that there can be multiple
>>>>>> options
>>>>>> for semantics of this feature, depending on use case. There is also
>>>>>> sometimes confusion because of event time out-of-order nature in
>>> Flink. I
>>>>>> am starting this thread to discuss potential use cases of this
>>> feature and
>>>>>> their requirements for interested users and developers. There was
>>> already
>>>>>> discussion thread asking about event time for TTL and it already
>>> contains
>>>>>> some thoughts [3].
>>>>>>
>>>>>> There are two semantical cases where we use time for TTL feature at
>>> the
>>>>>> moment. Firstly, we store timestamp of state last access/update.
>>> Secondly,
>>>>>> we use this timestamp and current timestamp to check expiration and
>>>>>> garbage
>>>>>> collect state at some point later.
>>>>>>
>>>>>> At the moment, Flink supports *only processing time* for both
>>> timestamps:
>>>>>> state *last access and current timestamp*. It is basically current
>>> local
>>>>>> system unix epoch time.
>>>>>>
>>>>>> When it comes to event time scale, we also need to define what Flink
>>>>>> should
>>>>>> use for these two timestamps. Here I will list some options and their
>>>>>> possible pros&cons for discussion. There might be more depending on
>>> use
>>>>>> case.
>>>>>>
>>>>>> *Last access timestamp (stored in backend with the actual state
>>> value):*
>>>>>>
>>>>>>  - *Event timestamp of currently being processed record.* This seems
>>> to
>>>>>>  be the simplest option and it allows user-defined timestamps in
>>> state
>>>>>>  backend. The problem here might be instability of event time which
>>> can
>>>>>> not
>>>>>>  only increase but also decrease if records come out of order. This
>>> can
>>>>>> lead
>>>>>>  to rewriting the state timestamp to smaller value which is unnatural
>>>>>> for
>>>>>>  the notion of time.
>>>>>>  - *Max event timestamp of records seen so far for this record key.*
>>>>>> This
>>>>>>  option is similar to the previous one but it tries to fix the
>>> notion of
>>>>>>  time to make it always increasing. Maintaining this timestamp has
>>> also
>>>>>>  performance implications because the previous timestamp needs to be
>>>>>> read
>>>>>>  out to decide whether to rewrite it.
>>>>>>  - *Last emitted watermark*. This is what we usually use for other
>>>>>>  operations to trigger some actions in Flink, like timers and windows
>>>>>> but it
>>>>>>  can be unrelated to the record which actually triggers the state
>>>>>> update.
>>>>>>
>>>>>> *Current timestamp to check expiration:*
>>>>>>
>>>>>>  - *Event timestamp of last processed record.* Again quite simple but
>>>>>>  unpredictable option for out-of-order events. It can potentially
>>> lead
>>>>>> to
>>>>>>  undesirable expiration of late buffered data in state without
>>> control.
>>>>>>  - *Max event timestamp of records seen so far for operator backend.*
>>>>>> Again
>>>>>>  similar to previous one, more stable but still user does not have
>>> too
>>>>>> much
>>>>>>  control when to expire state.
>>>>>>  - *Last emitted watermark*. Again, this is what we usually use for
>>>>>> other
>>>>>>  operations to trigger some actions in Flink, like timers and
>>> windows.
>>>>>> It
>>>>>>  also gives user some control to decide when state is expired (up to
>>>>>> which
>>>>>>  point in event time) by emitting certain watermark. It is more
>>>>>> flexible but
>>>>>>  complicated. If some watermark emitting strategy is already used for
>>>>>> other
>>>>>>  operations, it might be not optimal for TTL and delay state cleanup.
>>>>>>  - *Current processing time.* This option is quite simple, It would
>>> mean
>>>>>>  that user just decides which timestamp to store but it will expire
>>> in
>>>>>> real
>>>>>>  time. For data privacy use case, it might be better because we want
>>>>>> state
>>>>>>  to be unavailable in particular real moment of time since the
>>>>>> associated
>>>>>>  piece of data was created in event time. For long term approximate
>>>>>> garbage
>>>>>>  collection, it might be not a problem as well. For quick expiration,
>>>>>> the
>>>>>>  time skew between event and processing time can lead again to
>>> premature
>>>>>>  deletion of late data and user cannot delay it.
>>>>>>
>>>>>> We could also make this behaviour configurable. Another option is to
>>> make
>>>>>> time provider pluggable for users. The interface can give users
>>> context
>>>>>> (currently processed record, watermark etc) and ask them which
>>> timestamp
>>>>>> to
>>>>>> use. This is more complicated though.
>>>>>>
>>>>>> Looking forward for your feedback.
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12005
>>>>>> [2]
>>>>>>
>>>>>>
>>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
>>>>>> [3]
>>>>>>
>>>>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf | Solutions Architect
>>>>
>>>> +49 160 91394525
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Data Artisans GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>>
>>