Changing application configuration when restoring from checkpoint/savepoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Changing application configuration when restoring from checkpoint/savepoint

vishalovercome
My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with
function like so:

class AsyncFun(config: T) extends RichAsyncFunction[X, Y] {
   @transient private lazy val client = f(config, metricGroup, etc.)
   @transient private lazy val metricGroup = ...
   
   def asyncInvoke(....)
}

The variables are declared lazily as an alternative to implementing the open
method. This is un-avoidable as we're relying on flink's monitoring
libraries. Application resumes from checkpoint upon unexpected termination.
However, sometimes I want to change the parameter config that's passed as a
constructor argument but it doesn't work as Flink tries to restore from the
submittedJobGraph. This makes sense as Flink by itself doesn't know whether
its recovering from an abrupt termination and must therefore rely on old
config to build client or whether to start afresh. I want to know what
options do we have to allow for configuration changes (i.e. re-initializing
the operators):

1. Is there any way to restore from a checkpoint as well as recreate client
using newer configuration?
2. If we take a savepoint (drain and save) and then resume the job, then
will the configuration changes happen?
3. Will we have to move away from flink monitoring so as to initialize the
client inside the constructor?
4. One option is to remove the constructor argument entirely and load config
inside the open method. I want to know how this can be done without exposing
the entire application configuration. I could store the configuration inside
job parameters (by somehow converting this object to a map which I don't
want to) but how to load it back as this operator function is used by
multiple operators?
5. Any other option?

For functions that aren't AsyncFunction, is leveraging BroadcastState the
only way to dynamically update configuration?




--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Changing application configuration when restoring from checkpoint/savepoint

vishalovercome
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as
string). Then in the open method, lookup the relevant configuration using
getTaskName().

A follow up to this would be configuring custom windowing functions. I have
a size as well as a time based window class where size and time limits are
configurable and passed as constructor arguments. How to change this
configuration when state persistence/recovery is enabled? A window doesn't
have an open method per se




--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Changing application configuration when restoring from checkpoint/savepoint

Aljoscha Krettek-2
In reply to this post by vishalovercome
On Wed Dec 16, 2020 at 6:41 PM CET, vishalovercome wrote:
> 1. Is there any way to restore from a checkpoint as well as recreate
> client
> using newer configuration?

I think that would only work if you somehow read the configuration from an external system

> 2. If we take a savepoint (drain and save) and then resume the job, then
> will the configuration changes happen?

This should work, but you won't even need a drain. Just a stop with savepoint should work: https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint

> For functions that aren't AsyncFunction, is leveraging BroadcastState
> the
> only way to dynamically update configuration?

I think this is true, yes.

Best,
Aljoscha