My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with function like so: class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.) @transient private lazy val metricGroup = ... def asyncInvoke(....) } The variables are declared lazily as an alternative to implementing the open method. This is un-avoidable as we're relying on flink's monitoring libraries. Application resumes from checkpoint upon unexpected termination. However, sometimes I want to change the parameter config that's passed as a constructor argument but it doesn't work as Flink tries to restore from the submittedJobGraph. This makes sense as Flink by itself doesn't know whether its recovering from an abrupt termination and must therefore rely on old config to build client or whether to start afresh. I want to know what options do we have to allow for configuration changes (i.e. re-initializing the operators): 1. Is there any way to restore from a checkpoint as well as recreate client using newer configuration? 2. If we take a savepoint (drain and save) and then resume the job, then will the configuration changes happen? 3. Will we have to move away from flink monitoring so as to initialize the client inside the constructor? 4. One option is to remove the constructor argument entirely and load config inside the open method. I want to know how this can be done without exposing the entire application configuration. I could store the configuration inside job parameters (by somehow converting this object to a map which I don't want to) but how to load it back as this operator function is used by multiple operators? 5. Any other option? For functions that aren't AsyncFunction, is leveraging BroadcastState the only way to dynamically update configuration? -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as string). Then in the open method, lookup the relevant configuration using getTaskName(). A follow up to this would be configuring custom windowing functions. I have a size as well as a time based window class where size and time limits are configurable and passed as constructor arguments. How to change this configuration when state persistence/recovery is enabled? A window doesn't have an open method per se -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
In reply to this post by vishalovercome
On Wed Dec 16, 2020 at 6:41 PM CET, vishalovercome wrote:
> 1. Is there any way to restore from a checkpoint as well as recreate > client > using newer configuration? I think that would only work if you somehow read the configuration from an external system > 2. If we take a savepoint (drain and save) and then resume the job, then > will the configuration changes happen? This should work, but you won't even need a drain. Just a stop with savepoint should work: https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint > For functions that aren't AsyncFunction, is leveraging BroadcastState > the > only way to dynamically update configuration? I think this is true, yes. Best, Aljoscha |
Free forum by Nabble | Edit this page |