Savepoint for time windows

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Savepoint for time windows

Ozan DENİZ
Hi everyone,

I am trying to implement savepoint mechanism for my Flink project.

Here is the scenario:

I got the snapshot of Flink application by using "flink savepoint <JobId>" command while the application is running.

After saving snapshot of application, I canceled the job from web ui than I changed the topology of Flink application.
(To change the topology, I split the keyedstream into two seperate keyedstream)

After changing the topology, I run the new application by using the snapshot which I took in first step.

But after running the application, the window which has been snapshot, triggers without the new log.

Question is: Is there any way to save old window state to continue after starting new topology with snapshot?
     
Reply | Threaded
Open this post in threaded view
|

Re: Savepoint for time windows

Ufuk Celebi-2
Can you please share the program before and after the savepoint?

– Ufuk

On Mon, Apr 18, 2016 at 3:11 PM, Ozan DENİZ <[hidden email]> wrote:

> Hi everyone,
>
> I am trying to implement savepoint mechanism for my Flink project.
>
> Here is the scenario:
>
> I got the snapshot of Flink application by using "flink savepoint <JobId>" command while the application is running.
>
> After saving snapshot of application, I canceled the job from web ui than I changed the topology of Flink application.
> (To change the topology, I split the keyedstream into two seperate keyedstream)
>
> After changing the topology, I run the new application by using the snapshot which I took in first step.
>
> But after running the application, the window which has been snapshot, triggers without the new log.
>
> Question is: Is there any way to save old window state to continue after starting new topology with snapshot?
>
Reply | Threaded
Open this post in threaded view
|

Re: Savepoint for time windows

Stephan Ewen
Hi!

Yes, window contents is part of savepoints. If you change the topology, it
is crucial that the new topology matches the old window contents to the new
operator.

If you change the structure of the program, you probably need to assign
persistent names to the operators. See
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html#changes-to-your-program

Stephan


On Mon, Apr 18, 2016 at 5:43 PM, Ufuk Celebi <[hidden email]> wrote:

> Can you please share the program before and after the savepoint?
>
> – Ufuk
>
> On Mon, Apr 18, 2016 at 3:11 PM, Ozan DENİZ <[hidden email]> wrote:
> > Hi everyone,
> >
> > I am trying to implement savepoint mechanism for my Flink project.
> >
> > Here is the scenario:
> >
> > I got the snapshot of Flink application by using "flink savepoint
> <JobId>" command while the application is running.
> >
> > After saving snapshot of application, I canceled the job from web ui
> than I changed the topology of Flink application.
> > (To change the topology, I split the keyedstream into two seperate
> keyedstream)
> >
> > After changing the topology, I run the new application by using the
> snapshot which I took in first step.
> >
> > But after running the application, the window which has been snapshot,
> triggers without the new log.
> >
> > Question is: Is there any way to save old window state to continue after
> starting new topology with snapshot?
> >
>
Reply | Threaded
Open this post in threaded view
|

RE: Savepoint for time windows

Ozan DENİZ
Hi Stephan and Ufuk,

Thank you for your reply.

I have assigned uid to the "assignTimestampsAndWatermarks", "addSource", "apply" operators. However, I couldn't assign uid to the time window. Therefore the time window doesn't hold any state regarding timestamp.

For example, I implemented a custom window trigger.
Trigger condition: There must be 4 logs or 1 day (By the way, we are using event time.) to trigger.

After I send 3 logs, I snapshot the running application, I canceled the job then I changed the topology. To change the topology, I've just split data stream into two separate data streams. I re-run the application with the new topology by using snapshot. Then I didn't send any log to the new topology. In this case window shouldn't trigger and shouldn't call apply function.

But when I checked the output file, I saw that window has been triggered just after I re-run the application with new topology.

I think it flushes the old window.

Is there any way to hold old window state and continue with coming log?



> Date: Mon, 18 Apr 2016 18:04:50 +0200
> Subject: Re: Savepoint for time windows
> From: [hidden email]
> To: [hidden email]
>
> Hi!
>
> Yes, window contents is part of savepoints. If you change the topology, it
> is crucial that the new topology matches the old window contents to the new
> operator.
>
> If you change the structure of the program, you probably need to assign
> persistent names to the operators. See
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html#changes-to-your-program
>
> Stephan
>
>
> On Mon, Apr 18, 2016 at 5:43 PM, Ufuk Celebi <[hidden email]> wrote:
>
> > Can you please share the program before and after the savepoint?
> >
> > – Ufuk
> >
> > On Mon, Apr 18, 2016 at 3:11 PM, Ozan DENİZ <[hidden email]> wrote:
> > > Hi everyone,
> > >
> > > I am trying to implement savepoint mechanism for my Flink project.
> > >
> > > Here is the scenario:
> > >
> > > I got the snapshot of Flink application by using "flink savepoint
> > <JobId>" command while the application is running.
> > >
> > > After saving snapshot of application, I canceled the job from web ui
> > than I changed the topology of Flink application.
> > > (To change the topology, I split the keyedstream into two seperate
> > keyedstream)
> > >
> > > After changing the topology, I run the new application by using the
> > snapshot which I took in first step.
> > >
> > > But after running the application, the window which has been snapshot,
> > triggers without the new log.
> > >
> > > Question is: Is there any way to save old window state to continue after
> > starting new topology with snapshot?
> > >
> >
     
Reply | Threaded
Open this post in threaded view
|

Re: Savepoint for time windows

Aljoscha Krettek-2
Hi,
setting the uid on the result of the .apply() call is sufficient for the
whole window operation, including the windowing and trigger.

Could you maybe post some example code of the topology before and after the
change and restore from savepoint?

Cheers,
Aljoscha

On Tue, 19 Apr 2016 at 07:33 Ozan DENİZ <[hidden email]> wrote:

> Hi Stephan and Ufuk,
>
> Thank you for your reply.
>
> I have assigned uid to the "assignTimestampsAndWatermarks", "addSource",
> "apply" operators. However, I couldn't assign uid to the time window.
> Therefore the time window doesn't hold any state regarding timestamp.
>
> For example, I implemented a custom window trigger.
> Trigger condition: There must be 4 logs or 1 day (By the way, we are using
> event time.) to trigger.
>
> After I send 3 logs, I snapshot the running application, I canceled the
> job then I changed the topology. To change the topology, I've just split
> data stream into two separate data streams. I re-run the application with
> the new topology by using snapshot. Then I didn't send any log to the new
> topology. In this case window shouldn't trigger and shouldn't call apply
> function.
>
> But when I checked the output file, I saw that window has been triggered
> just after I re-run the application with new topology.
>
> I think it flushes the old window.
>
> Is there any way to hold old window state and continue with coming log?
>
>
>
> > Date: Mon, 18 Apr 2016 18:04:50 +0200
> > Subject: Re: Savepoint for time windows
> > From: [hidden email]
> > To: [hidden email]
> >
> > Hi!
> >
> > Yes, window contents is part of savepoints. If you change the topology,
> it
> > is crucial that the new topology matches the old window contents to the
> new
> > operator.
> >
> > If you change the structure of the program, you probably need to assign
> > persistent names to the operators. See
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html#changes-to-your-program
> >
> > Stephan
> >
> >
> > On Mon, Apr 18, 2016 at 5:43 PM, Ufuk Celebi <[hidden email]> wrote:
> >
> > > Can you please share the program before and after the savepoint?
> > >
> > > – Ufuk
> > >
> > > On Mon, Apr 18, 2016 at 3:11 PM, Ozan DENİZ <[hidden email]>
> wrote:
> > > > Hi everyone,
> > > >
> > > > I am trying to implement savepoint mechanism for my Flink project.
> > > >
> > > > Here is the scenario:
> > > >
> > > > I got the snapshot of Flink application by using "flink savepoint
> > > <JobId>" command while the application is running.
> > > >
> > > > After saving snapshot of application, I canceled the job from web ui
> > > than I changed the topology of Flink application.
> > > > (To change the topology, I split the keyedstream into two seperate
> > > keyedstream)
> > > >
> > > > After changing the topology, I run the new application by using the
> > > snapshot which I took in first step.
> > > >
> > > > But after running the application, the window which has been
> snapshot,
> > > triggers without the new log.
> > > >
> > > > Question is: Is there any way to save old window state to continue
> after
> > > starting new topology with snapshot?
> > > >
> > >
>