Run a Flink Job with A custom ClassLoader (GSoC hadoop abstraction layer)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Run a Flink Job with A custom ClassLoader (GSoC hadoop abstraction layer)

Artem Tsikiridis
Hi,

when running a jar on flink I am trying to replace a class with a subclass
at runtime.

Let me explain why I believe this is necessary:

I am working on a hadoop abstraction layer for Flink (Google Summer of Code
14) so that the user can run hadoop jobs on it. So currently the user
doesn't have to write any specific Flink code apart from the actual
endpoint of the application; a FlinkHadoopJobClient.runJob(JobConf); is
enough . Ideally, he shouldn't even have to write that.

Of course, a hadoop job has a JobClient.runJob(JobConf) which runs hadoop
and not a FlinkHadoopJobClient (subclass) which runs flink...

I was wondering if there is a mechanism to run a Flink Job with a custom
ClassLoader just by specifying it in the config on a per job basis.

The proposed classloader is quite simple:

Everytime you see a JobClient give a FlinkHadoopJobClient.
Delegate every other class to the parent classloader.

 The problem is that I just can't access the classloader that loads the
JobClient (or generally Flink user code).


I got it working with javassist (code instrumentation library) but It is
very ugly and hacky. I'm sure there is something better.

Am I missing of an obvious way to set a classloader for Flink? Or maybe I
don't understand the chain of Classloaders here or in general?

Thank you in advance,

Artem

PS. My GSoC progress tracker issue:
https://issues.apache.org/jira/browse/FLINK-838
Reply | Threaded
Open this post in threaded view
|

Re: Run a Flink Job with A custom ClassLoader (GSoC hadoop abstraction layer)

Robert Metzger
Hi Artem,

Have a look in the RegularPactTask.registerInputOutput() method. There, we
load a usercode-classloader. At runtime, there are two classloaders
present:
a) the system-classloader: it loads the jar files present at the startup of
Flink (those in the lib/ directory). This classloader is unable to load the
usercode, since this is job-dependent.
b) The usercode classloader: It is able to load classes that are send from
the client as serialized objects (for example instances of mappers etc.
created in the usercode) or in a program jar file. The LibraryCacheManager
(which returns a usercode classloader) is managing the jar files send along
with the jobs.

Am I seeing correctly that you only need this classloader hack for
submitting the job? So if the user wants to run a HadoopMR job (without
changing a single line of code), he has to run ./bin/flinkMR or so?
If thats the case, you don't need to care about Flinks runtime
classloaders. Just implement the classloader you've outlined in your mail
and use this one to load the jar file that contains the main()-method of
the job. Once you have extracted the JobConf, you should be good to go
"Flink only".

I hope the information I gave here is correct. Let me know if you need
further help.

Robert



On Fri, Jul 25, 2014 at 1:09 AM, Artem Tsikiridis <[hidden email]>
wrote:

> Hi,
>
> when running a jar on flink I am trying to replace a class with a subclass
> at runtime.
>
> Let me explain why I believe this is necessary:
>
> I am working on a hadoop abstraction layer for Flink (Google Summer of Code
> 14) so that the user can run hadoop jobs on it. So currently the user
> doesn't have to write any specific Flink code apart from the actual
> endpoint of the application; a FlinkHadoopJobClient.runJob(JobConf); is
> enough . Ideally, he shouldn't even have to write that.
>
> Of course, a hadoop job has a JobClient.runJob(JobConf) which runs hadoop
> and not a FlinkHadoopJobClient (subclass) which runs flink...
>
> I was wondering if there is a mechanism to run a Flink Job with a custom
> ClassLoader just by specifying it in the config on a per job basis.
>
> The proposed classloader is quite simple:
>
> Everytime you see a JobClient give a FlinkHadoopJobClient.
> Delegate every other class to the parent classloader.
>
>  The problem is that I just can't access the classloader that loads the
> JobClient (or generally Flink user code).
>
>
> I got it working with javassist (code instrumentation library) but It is
> very ugly and hacky. I'm sure there is something better.
>
> Am I missing of an obvious way to set a classloader for Flink? Or maybe I
> don't understand the chain of Classloaders here or in general?
>
> Thank you in advance,
>
> Artem
>
> PS. My GSoC progress tracker issue:
> https://issues.apache.org/jira/browse/FLINK-838
>