Hi all: I am paying attention to Flink, and encounter a problem about user defined window with checkpoint. My code like this: class WindowStatistics extends WindowFunction[Event, Int, Tuple, TimeWindow] with Checkpointed[Option[Int]]: Unit = { private var count = 0 override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event], out: Collector[Int]): Unit = { count = XXXX XXXXXXXX out.collect(count) } override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Option[Int] = { Some(count) } Override def restoreState(state: Option[Int]): Unit = { state match { case Some(c) => count = c case None => count = 0 } } } and env.enableCheckpointing(5000) env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) when making checkpoint, my window only make checkpoint of data in window(panes), but user defined state(count) is not contained in checkpoint. When debugging, I found in line 123 of AbstractUdfStreamOperator.java
if (userFunction instanceof Checkpointed) { XXXXXX } is false(other operators, like map, filter is true). And userFunction is actually a ScalaWindowFunctionWrapper object. So, my question is : Is it a bug? If not, what is the design philosophy of window’s checkpoint? In many scenes, users may want to checkpoint their own defined states, but the design does not support seemingly. Or my method
of window’s checkpoint application is wrong? Thank you!
|
Is there some State backend and checkpoint design architecture document?
ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. https://github.com/pusuo/streaming-resource/blob/master/flink-meetup-hz-20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7%E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf Thanks At 2016-11-26 10:30:52, "liuxinchun" <[hidden email]> wrote: Hi all: I am paying attention to Flink, and encounter a problem about user defined window with checkpoint. My code like this: class WindowStatistics extends WindowFunction[Event, Int, Tuple, TimeWindow] with Checkpointed[Option[Int]]: Unit = { private var count = 0 override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event], out: Collector[Int]): Unit = { count = XXXX XXXXXXXX out.collect(count) } override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Option[Int] = { Some(count) } Override def restoreState(state: Option[Int]): Unit = { state match { case Some(c) => count = c case None => count = 0 } } } and env.enableCheckpointing(5000) env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) when making checkpoint, my window only make checkpoint of data in window(panes), but user defined state(count) is not contained in checkpoint. When debugging, I found in line 123 of AbstractUdfStreamOperator.java if (userFunction instanceof Checkpointed) { XXXXXX } is false(other operators, like map, filter is true). And userFunction is actually a ScalaWindowFunctionWrapper object. So, my question is : Is it a bug? If not, what is the design philosophy of window’s checkpoint? In many scenes, users may want to checkpoint their own defined states, but the design does not support seemingly. Or my method of window’s checkpoint application is wrong? Thank you! |
Hi 时某人,
I think you've found an inconsistency in Flink's windowing API (but it's the same in the Java API). Handling operator state in the context of windows is a little bit delicate because you could have multiple windows in flight, though. I've pulled Aljoscha in this thread who is more familiar with the windowing API and can give you probably a better explanation. I think either we allow it or we check that a window function does not implement the Checkpointed interface and if it does, then notify the user about it. Furthermore, I think we should document these subtle behaviour differences better. Cheers, Till On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <[hidden email]> wrote: > Is there some State backend and checkpoint design architecture document? > > > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup-hz- > 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf > > > Thanks > > At 2016-11-26 10:30:52, "liuxinchun" <[hidden email]> wrote: > > > Hi all: > > > > I am paying attention to Flink, and encounter a problem about user defined > window with checkpoint. My code like this: > > > > class WindowStatistics extends WindowFunction[Event, Int, Tuple, > TimeWindow] with Checkpointed[Option[Int]]: Unit = { > > > > private var count = 0 > > > > override def apply(key: Tuple, window: TimeWindow, input: Iterator[Event], > out: Collector[Int]): Unit = { > > count = XXXX > > XXXXXXXX > > out.collect(count) > > } > > override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): > Option[Int] = { > > Some(count) > > } > > > > Override def restoreState(state: Option[Int]): Unit = { > > state match { > > case Some(c) => count = c > > case None => count = 0 > > } > > } > > } > > and > > env.enableCheckpointing(5000) > > env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) > > when making checkpoint, my window only make checkpoint of data in > window(panes), but user defined state(count) is not contained in > checkpoint. When debugging, I found in > > line 123 of AbstractUdfStreamOperator.java > > if (userFunction instanceof Checkpointed) { > > XXXXXX > > } > > is false(other operators, like map, filter is true). And userFunction is > actually a ScalaWindowFunctionWrapper object. > > So, my question is : Is it a bug? If not, what is the design philosophy > of window’s checkpoint? In many scenes, users may want to checkpoint their > own defined states, but the design does not support seemingly. Or my method > of window’s checkpoint application is wrong? > > Thank you! |
Hi,
this is indeed a bug (though I would see it more as a feature since I think using the Checkpointed interface there can indeed be problematic, as Till pointed out). The problem is that the Scala Wrapper functions have to implement all kinds of interfaces so that they can forward to the wrapped function. Or we would have to have a wrapper function for each combination of interfaces that a user function can implement. In the long run, our use of interfaces for user functions does not seem to scale well in the Scala API. Cheers, Aljoscha On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <[hidden email]> wrote: > Hi 时某人, > > I think you've found an inconsistency in Flink's windowing API (but it's > the same in the Java API). Handling operator state in the context of > windows is a little bit delicate because you could have multiple windows in > flight, though. I've pulled Aljoscha in this thread who is more familiar > with the windowing API and can give you probably a better explanation. > > I think either we allow it or we check that a window function does not > implement the Checkpointed interface and if it does, then notify the user > about it. Furthermore, I think we should document these subtle behaviour > differences better. > > Cheers, > Till > > On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <[hidden email]> wrote: > > > Is there some State backend and checkpoint design architecture document? > > > > > > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. > > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup-hz- > > 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% > > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf > > > > > > Thanks > > > > At 2016-11-26 10:30:52, "liuxinchun" <[hidden email]> wrote: > > > > > > Hi all: > > > > > > > > I am paying attention to Flink, and encounter a problem about user > defined > > window with checkpoint. My code like this: > > > > > > > > class WindowStatistics extends WindowFunction[Event, Int, Tuple, > > TimeWindow] with Checkpointed[Option[Int]]: Unit = { > > > > > > > > private var count = 0 > > > > > > > > override def apply(key: Tuple, window: TimeWindow, input: > Iterator[Event], > > out: Collector[Int]): Unit = { > > > > count = XXXX > > > > XXXXXXXX > > > > out.collect(count) > > > > } > > > > override def snapshotState(checkpointId: Long, checkpointTimestamp: > Long): > > Option[Int] = { > > > > Some(count) > > > > } > > > > > > > > Override def restoreState(state: Option[Int]): Unit = { > > > > state match { > > > > case Some(c) => count = c > > > > case None => count = 0 > > > > } > > > > } > > > > } > > > > and > > > > env.enableCheckpointing(5000) > > > > env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) > > > > when making checkpoint, my window only make checkpoint of data in > > window(panes), but user defined state(count) is not contained in > > checkpoint. When debugging, I found in > > > > line 123 of AbstractUdfStreamOperator.java > > > > if (userFunction instanceof Checkpointed) { > > > > XXXXXX > > > > } > > > > is false(other operators, like map, filter is true). And userFunction is > > actually a ScalaWindowFunctionWrapper object. > > > > So, my question is : Is it a bug? If not, what is the design philosophy > > of window’s checkpoint? In many scenes, users may want to checkpoint > their > > own defined states, but the design does not support seemingly. Or my > method > > of window’s checkpoint application is wrong? > > > > Thank you! > |
Dear Aljoscha:
I'm the colleague of 时某人,and participate in the design of Checkpoint mechanism of a certain Streaming Processing System, which refers to Flink's Checkpoint mechanism. In my opinion, the most difficulty in stream's checkpoint is big window's backup. In many applications, the window may contain hundreds of GB data. We designed a mechanism using incremental backup for window. That is a integrated window may be kept in several successive checkpointed states(every checkpointed state keeps partial window data, records the boundary of the partial window and the integrated window's boundary at that moment). When restoring the state, the thread needs to scan several successive checkpointed states in order to recover the whole window according to the boundaries of partial windows and whole window. I think maybe this can be a candidate way of backuping window. What's more, I think window's checkpoint should support user-defined state using Checkpointed Interface. In many applications, users may calculate many import states in accumulated way cording to the history of stream. Once lost, these states couldn't be recovered using current Flink(version 1.1.3) window checkpoint mechanism. Syinchwun Leo -----邮件原件----- 发件人: Aljoscha Krettek [mailto:[hidden email]] 发送时间: 2016年11月28日 18:58 收件人: [hidden email] 主题: Re: Window's Checkpoint problem Hi, this is indeed a bug (though I would see it more as a feature since I think using the Checkpointed interface there can indeed be problematic, as Till pointed out). The problem is that the Scala Wrapper functions have to implement all kinds of interfaces so that they can forward to the wrapped function. Or we would have to have a wrapper function for each combination of interfaces that a user function can implement. In the long run, our use of interfaces for user functions does not seem to scale well in the Scala API. Cheers, Aljoscha On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <[hidden email]> wrote: > Hi 时某人, > > I think you've found an inconsistency in Flink's windowing API (but > it's the same in the Java API). Handling operator state in the context > of windows is a little bit delicate because you could have multiple > windows in flight, though. I've pulled Aljoscha in this thread who is > more familiar with the windowing API and can give you probably a better explanation. > > I think either we allow it or we check that a window function does not > implement the Checkpointed interface and if it does, then notify the > user about it. Furthermore, I think we should document these subtle > behaviour differences better. > > Cheers, > Till > > On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <[hidden email]> wrote: > > > Is there some State backend and checkpoint design architecture document? > > > > > > ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. > > https://github.com/pusuo/streaming-resource/blob/master/flink-meetup > > -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% > > E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf > > > > > > Thanks > > > > At 2016-11-26 10:30:52, "liuxinchun" <[hidden email]> wrote: > > > > > > Hi all: > > > > > > > > I am paying attention to Flink, and encounter a problem about user > defined > > window with checkpoint. My code like this: > > > > > > > > class WindowStatistics extends WindowFunction[Event, Int, Tuple, > > TimeWindow] with Checkpointed[Option[Int]]: Unit = { > > > > > > > > private var count = 0 > > > > > > > > override def apply(key: Tuple, window: TimeWindow, input: > Iterator[Event], > > out: Collector[Int]): Unit = { > > > > count = XXXX > > > > XXXXXXXX > > > > out.collect(count) > > > > } > > > > override def snapshotState(checkpointId: Long, checkpointTimestamp: > Long): > > Option[Int] = { > > > > Some(count) > > > > } > > > > > > > > Override def restoreState(state: Option[Int]): Unit = { > > > > state match { > > > > case Some(c) => count = c > > > > case None => count = 0 > > > > } > > > > } > > > > } > > > > and > > > > env.enableCheckpointing(5000) > > > > env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) > > > > when making checkpoint, my window only make checkpoint of data in > > window(panes), but user defined state(count) is not contained in > > checkpoint. When debugging, I found in > > > > line 123 of AbstractUdfStreamOperator.java > > > > if (userFunction instanceof Checkpointed) { > > > > XXXXXX > > > > } > > > > is false(other operators, like map, filter is true). And > > userFunction is actually a ScalaWindowFunctionWrapper object. > > > > So, my question is : Is it a bug? If not, what is the design > > philosophy of window’s checkpoint? In many scenes, users may want to > > checkpoint > their > > own defined states, but the design does not support seemingly. Or my > method > > of window’s checkpoint application is wrong? > > > > Thank you! > |
Syinchwun,
Can you maybe share more technical details such as design docs/papers about this approach, or is it confidential? It sounds interesting but the details make a difference (i.e. partial boundaries - bookkeeping). Paris > On 28 Nov 2016, at 13:01, liuxinchun <[hidden email]> wrote: > > Dear Aljoscha: > > I'm the colleague of 时某人,and participate in the design of Checkpoint mechanism of a certain Streaming Processing System, which refers to Flink's Checkpoint mechanism. In my opinion, the most difficulty in stream's checkpoint is big window's backup. In many applications, the window may contain hundreds of GB data. We designed a mechanism using incremental backup for window. That is a integrated window may be kept in several successive checkpointed states(every checkpointed state keeps partial window data, records the boundary of the partial window and the integrated window's boundary at that moment). When restoring the state, the thread needs to scan several successive checkpointed states in order to recover the whole window according to the boundaries of partial windows and whole window. > > I think maybe this can be a candidate way of backuping window. What's more, I think window's checkpoint should support user-defined state using Checkpointed Interface. In many applications, users may calculate many import states in accumulated way cording to the history of stream. Once lost, these states couldn't be recovered using current Flink(version 1.1.3) window checkpoint mechanism. > > Syinchwun Leo > > -----邮件原件----- > 发件人: Aljoscha Krettek [mailto:[hidden email]] > 发送时间: 2016年11月28日 18:58 > 收件人: [hidden email] > 主题: Re: Window's Checkpoint problem > > Hi, > this is indeed a bug (though I would see it more as a feature since I think using the Checkpointed interface there can indeed be problematic, as Till pointed out). The problem is that the Scala Wrapper functions have to implement all kinds of interfaces so that they can forward to the wrapped function. Or we would have to have a wrapper function for each combination of interfaces that a user function can implement. > > In the long run, our use of interfaces for user functions does not seem to scale well in the Scala API. > > Cheers, > Aljoscha > > On Mon, 28 Nov 2016 at 10:49 Till Rohrmann <[hidden email]> wrote: > >> Hi 时某人, >> >> I think you've found an inconsistency in Flink's windowing API (but >> it's the same in the Java API). Handling operator state in the context >> of windows is a little bit delicate because you could have multiple >> windows in flight, though. I've pulled Aljoscha in this thread who is >> more familiar with the windowing API and can give you probably a better explanation. >> >> I think either we allow it or we check that a window function does not >> implement the Checkpointed interface and if it does, then notify the >> user about it. Furthermore, I think we should document these subtle >> behaviour differences better. >> >> Cheers, >> Till >> >> On Sat, Nov 26, 2016 at 4:47 AM, 时某人 <[hidden email]> wrote: >> >>> Is there some State backend and checkpoint design architecture document? >>> >>> >>> ChengXiang Li have some sharing about checkout on Hangzhou Flink Meetup. >>> https://github.com/pusuo/streaming-resource/blob/master/flink-meetup >>> -hz- 20161105/Flink%E5%88%86%E5%B8%83%E5%BC%8F%E5%BF%AB%E7%85%A7% >>> E6%B7%B1%E5%85%A5%E5%88%86%E6%9E%90_%E6%9D%8E%E5%91%88%E7%A5%A5.pdf >>> >>> >>> Thanks >>> >>> At 2016-11-26 10:30:52, "liuxinchun" <[hidden email]> wrote: >>> >>> >>> Hi all: >>> >>> >>> >>> I am paying attention to Flink, and encounter a problem about user >> defined >>> window with checkpoint. My code like this: >>> >>> >>> >>> class WindowStatistics extends WindowFunction[Event, Int, Tuple, >>> TimeWindow] with Checkpointed[Option[Int]]: Unit = { >>> >>> >>> >>> private var count = 0 >>> >>> >>> >>> override def apply(key: Tuple, window: TimeWindow, input: >> Iterator[Event], >>> out: Collector[Int]): Unit = { >>> >>> count = XXXX >>> >>> XXXXXXXX >>> >>> out.collect(count) >>> >>> } >>> >>> override def snapshotState(checkpointId: Long, checkpointTimestamp: >> Long): >>> Option[Int] = { >>> >>> Some(count) >>> >>> } >>> >>> >>> >>> Override def restoreState(state: Option[Int]): Unit = { >>> >>> state match { >>> >>> case Some(c) => count = c >>> >>> case None => count = 0 >>> >>> } >>> >>> } >>> >>> } >>> >>> and >>> >>> env.enableCheckpointing(5000) >>> >>> env.setStateBackend(new RocksDBStateBackend(“file:///data/”)) >>> >>> when making checkpoint, my window only make checkpoint of data in >>> window(panes), but user defined state(count) is not contained in >>> checkpoint. When debugging, I found in >>> >>> line 123 of AbstractUdfStreamOperator.java >>> >>> if (userFunction instanceof Checkpointed) { >>> >>> XXXXXX >>> >>> } >>> >>> is false(other operators, like map, filter is true). And >>> userFunction is actually a ScalaWindowFunctionWrapper object. >>> >>> So, my question is : Is it a bug? If not, what is the design >>> philosophy of window’s checkpoint? In many scenes, users may want to >>> checkpoint >> their >>> own defined states, but the design does not support seemingly. Or my >> method >>> of window’s checkpoint application is wrong? >>> >>> Thank you! >> |
Free forum by Nabble | Edit this page |