Introducing Flink's Plugin mechanism

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Introducing Flink's Plugin mechanism

Piotr Nowojski-3
Hi Flink developers,

I would like to introduce a new plugin loading mechanism that we are working on right now [1]. The idea is quite simple: isolate services in separate independent class loaders, so that classes and dependencies do not leak between them and/or Flink runtime itself. Currently we have quite some problems with dependency convergence in multiple places. Some of them we are solving by shading (built in file systems, metrics), some we are forcing users to deal with them (custom file systems/metrics) and others we do not solve (connectors - we do not support using different Kafka versions in the same job/SQL). With proper plugins, that are loaded in independent class loaders, those issues could be solved in a generic way.

Current scope of implementation targets only file systems, without a centralised Plugin architecture and with Plugins that are only “statically” initialised at the TaskManager and JobManager start up. More or less we are just replacing the way how FileSystem’s implementations are discovered & loaded.

In the future this idea could be extended to different modules, like metric reporters, connectors, functions/data types (especially in SQL), state backends, internal storage or other future efforts. Some of those would be easier than others: the metric reporters would require some smaller refactor, while connectors would require some bigger API design discussions, which I would like to avoid at the moment. Nevertheless I wanted to reach out with this idea so if some other potential use cases pop up in the future, more people will be aware.

Piotr Nowojski


[1] https://issues.apache.org/jira/browse/FLINK-11952 <https://issues.apache.org/jira/browse/FLINK-11952>
Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Jeff Zhang
Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
important for the ecosystem of flink.

Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:

> Hi Flink developers,
>
> I would like to introduce a new plugin loading mechanism that we are
> working on right now [1]. The idea is quite simple: isolate services in
> separate independent class loaders, so that classes and dependencies do not
> leak between them and/or Flink runtime itself. Currently we have quite some
> problems with dependency convergence in multiple places. Some of them we
> are solving by shading (built in file systems, metrics), some we are
> forcing users to deal with them (custom file systems/metrics) and others we
> do not solve (connectors - we do not support using different Kafka versions
> in the same job/SQL). With proper plugins, that are loaded in independent
> class loaders, those issues could be solved in a generic way.
>
> Current scope of implementation targets only file systems, without a
> centralised Plugin architecture and with Plugins that are only “statically”
> initialised at the TaskManager and JobManager start up. More or less we are
> just replacing the way how FileSystem’s implementations are discovered &
> loaded.
>
> In the future this idea could be extended to different modules, like
> metric reporters, connectors, functions/data types (especially in SQL),
> state backends, internal storage or other future efforts. Some of those
> would be easier than others: the metric reporters would require some
> smaller refactor, while connectors would require some bigger API design
> discussions, which I would like to avoid at the moment. Nevertheless I
> wanted to reach out with this idea so if some other potential use cases pop
> up in the future, more people will be aware.
>
> Piotr Nowojski
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
> https://issues.apache.org/jira/browse/FLINK-11952>



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Zhijiang(wangzhijiang999)
Thanks Piotr for proposing this new feature.

The solution for class loader issue is really helpful in production, and we ofen encountered this pain point before.
It might bring more possibilities based on this pluggable mechanism.  Hope to see the progress soon. :)

Best,
Zhijiang
------------------------------------------------------------------
From:Jeff Zhang <[hidden email]>
Send Time:2019年4月10日(星期三) 22:01
To:dev <[hidden email]>
Subject:Re: Introducing Flink's Plugin mechanism

Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
important for the ecosystem of flink.

Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:

> Hi Flink developers,
>
> I would like to introduce a new plugin loading mechanism that we are
> working on right now [1]. The idea is quite simple: isolate services in
> separate independent class loaders, so that classes and dependencies do not
> leak between them and/or Flink runtime itself. Currently we have quite some
> problems with dependency convergence in multiple places. Some of them we
> are solving by shading (built in file systems, metrics), some we are
> forcing users to deal with them (custom file systems/metrics) and others we
> do not solve (connectors - we do not support using different Kafka versions
> in the same job/SQL). With proper plugins, that are loaded in independent
> class loaders, those issues could be solved in a generic way.
>
> Current scope of implementation targets only file systems, without a
> centralised Plugin architecture and with Plugins that are only “statically”
> initialised at the TaskManager and JobManager start up. More or less we are
> just replacing the way how FileSystem’s implementations are discovered &
> loaded.
>
> In the future this idea could be extended to different modules, like
> metric reporters, connectors, functions/data types (especially in SQL),
> state backends, internal storage or other future efforts. Some of those
> would be easier than others: the metric reporters would require some
> smaller refactor, while connectors would require some bigger API design
> discussions, which I would like to avoid at the moment. Nevertheless I
> wanted to reach out with this idea so if some other potential use cases pop
> up in the future, more people will be aware.
>
> Piotr Nowojski
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
> https://issues.apache.org/jira/browse/FLINK-11952>



--
Best Regards

Jeff Zhang

Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Stefan Richter-2
Thank you Piotr for bringing this discussion to the mailing list! As it was not explicitly mentioned in the first email, I wanted to add that there is also already an open PR[1] with my implementation of the basic plugin mechanism for FileSystem. Looking forward to some feedback from the community.


[1] https://github.com/apache/flink/pull/8038 <https://github.com/apache/flink/pull/8038>

Best,
Stefan

> On 10. Apr 2019, at 17:08, zhijiang <[hidden email]> wrote:
>
> Thanks Piotr for proposing this new feature.
>
> The solution for class loader issue is really helpful in production, and we ofen encountered this pain point before.
> It might bring more possibilities based on this pluggable mechanism.  Hope to see the progress soon. :)
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:Jeff Zhang <[hidden email]>
> Send Time:2019年4月10日(星期三) 22:01
> To:dev <[hidden email]>
> Subject:Re: Introducing Flink's Plugin mechanism
>
> Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
> important for the ecosystem of flink.
>
> Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:
>
>> Hi Flink developers,
>>
>> I would like to introduce a new plugin loading mechanism that we are
>> working on right now [1]. The idea is quite simple: isolate services in
>> separate independent class loaders, so that classes and dependencies do not
>> leak between them and/or Flink runtime itself. Currently we have quite some
>> problems with dependency convergence in multiple places. Some of them we
>> are solving by shading (built in file systems, metrics), some we are
>> forcing users to deal with them (custom file systems/metrics) and others we
>> do not solve (connectors - we do not support using different Kafka versions
>> in the same job/SQL). With proper plugins, that are loaded in independent
>> class loaders, those issues could be solved in a generic way.
>>
>> Current scope of implementation targets only file systems, without a
>> centralised Plugin architecture and with Plugins that are only “statically”
>> initialised at the TaskManager and JobManager start up. More or less we are
>> just replacing the way how FileSystem’s implementations are discovered &
>> loaded.
>>
>> In the future this idea could be extended to different modules, like
>> metric reporters, connectors, functions/data types (especially in SQL),
>> state backends, internal storage or other future efforts. Some of those
>> would be easier than others: the metric reporters would require some
>> smaller refactor, while connectors would require some bigger API design
>> discussions, which I would like to avoid at the moment. Nevertheless I
>> wanted to reach out with this idea so if some other potential use cases pop
>> up in the future, more people will be aware.
>>
>> Piotr Nowojski
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
>> https://issues.apache.org/jira/browse/FLINK-11952>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Biao Liu
Hi Stefan &Piotr,
Thank you for bringing this discussion. As Zhijiang said, class conflict
makes a lot of trouble in our production environment.
I was wondering is there any design document currently? It might be helpful
to understand the PR and even the whole picture as Piotr said in the future
it could be extended to other modules.

Stefan Richter <[hidden email]> 于2019年4月10日周三 下午11:22写道:

> Thank you Piotr for bringing this discussion to the mailing list! As it
> was not explicitly mentioned in the first email, I wanted to add that there
> is also already an open PR[1] with my implementation of the basic plugin
> mechanism for FileSystem. Looking forward to some feedback from the
> community.
>
>
> [1] https://github.com/apache/flink/pull/8038 <
> https://github.com/apache/flink/pull/8038>
>
> Best,
> Stefan
>
> > On 10. Apr 2019, at 17:08, zhijiang <[hidden email]>
> wrote:
> >
> > Thanks Piotr for proposing this new feature.
> >
> > The solution for class loader issue is really helpful in production, and
> we ofen encountered this pain point before.
> > It might bring more possibilities based on this pluggable mechanism.
> Hope to see the progress soon. :)
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > From:Jeff Zhang <[hidden email]>
> > Send Time:2019年4月10日(星期三) 22:01
> > To:dev <[hidden email]>
> > Subject:Re: Introducing Flink's Plugin mechanism
> >
> > Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
> > important for the ecosystem of flink.
> >
> > Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:
> >
> >> Hi Flink developers,
> >>
> >> I would like to introduce a new plugin loading mechanism that we are
> >> working on right now [1]. The idea is quite simple: isolate services in
> >> separate independent class loaders, so that classes and dependencies do
> not
> >> leak between them and/or Flink runtime itself. Currently we have quite
> some
> >> problems with dependency convergence in multiple places. Some of them we
> >> are solving by shading (built in file systems, metrics), some we are
> >> forcing users to deal with them (custom file systems/metrics) and
> others we
> >> do not solve (connectors - we do not support using different Kafka
> versions
> >> in the same job/SQL). With proper plugins, that are loaded in
> independent
> >> class loaders, those issues could be solved in a generic way.
> >>
> >> Current scope of implementation targets only file systems, without a
> >> centralised Plugin architecture and with Plugins that are only
> “statically”
> >> initialised at the TaskManager and JobManager start up. More or less we
> are
> >> just replacing the way how FileSystem’s implementations are discovered &
> >> loaded.
> >>
> >> In the future this idea could be extended to different modules, like
> >> metric reporters, connectors, functions/data types (especially in SQL),
> >> state backends, internal storage or other future efforts. Some of those
> >> would be easier than others: the metric reporters would require some
> >> smaller refactor, while connectors would require some bigger API design
> >> discussions, which I would like to avoid at the moment. Nevertheless I
> >> wanted to reach out with this idea so if some other potential use cases
> pop
> >> up in the future, more people will be aware.
> >>
> >> Piotr Nowojski
> >>
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
> >> https://issues.apache.org/jira/browse/FLINK-11952>
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Piotr Nowojski-3
Hi Biao,

No there is not. The current scope is so small and it was actually very easy to implement, that design doc might have been an overkill for it. While for the future plans we decided consciously to not account for them and not plan at the moment, but to tackle them lazily.

Idea behind current proposal is described at the moment in java docs [1], or in the tests and there will be of course a documentation update. Gist is to have `./plugins` directory, with subdirectories of jars:

./plugins/pluginA/foo-1.0.jar
./plugins/pluginA/bar-0.9.jar

./plugins/pluginB/foo-2.0.jar

All of the jars from each plugin subdirectory will be loaded in separate class loaders and instead of looking for `FileSystem.class` implementations in the Flink’s main class loader, we will iterate over those plugins’ class loaders and search inside them. Which is a simple extension of the current mechanism.

In the future we will need some discussion how do we want to evolve this mechanism to handle more than just `FileSystem`s:
1. Will we want to introduce some centralised architecture? That we would discover a `Plugin.class` implementations and `Plugin` would provide `FileSystem`, `MetricReporter`, `Connector` implementations? Or would we discover `FileSystem`, `MetricReporter`, `Connector` separately?
2. Do we want to support “dynamically” loaded plugins, provided during the job submission?
3. Do we want to support somehow DataStream Sources/Sinks as plugins? If yes, how would the API look like? Some shared between connectors Sinks/Sources factory interface?
4. How to expose `Plugins` inside operators, especially in Table API/SQL operators to support TableSource/SinkFactories loaded via plugins?  
5. …?

Those are some of the questions that we are intentionally trying to avoid at the moment.

Piotrek  

[1] https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5 <https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5>

> On 10 Apr 2019, at 17:40, Biao Liu <[hidden email]> wrote:
>
> Hi Stefan &Piotr,
> Thank you for bringing this discussion. As Zhijiang said, class conflict
> makes a lot of trouble in our production environment.
> I was wondering is there any design document currently? It might be helpful
> to understand the PR and even the whole picture as Piotr said in the future
> it could be extended to other modules.
>
> Stefan Richter <[hidden email]> 于2019年4月10日周三 下午11:22写道:
>
>> Thank you Piotr for bringing this discussion to the mailing list! As it
>> was not explicitly mentioned in the first email, I wanted to add that there
>> is also already an open PR[1] with my implementation of the basic plugin
>> mechanism for FileSystem. Looking forward to some feedback from the
>> community.
>>
>>
>> [1] https://github.com/apache/flink/pull/8038 <
>> https://github.com/apache/flink/pull/8038>
>>
>> Best,
>> Stefan
>>
>>> On 10. Apr 2019, at 17:08, zhijiang <[hidden email]>
>> wrote:
>>>
>>> Thanks Piotr for proposing this new feature.
>>>
>>> The solution for class loader issue is really helpful in production, and
>> we ofen encountered this pain point before.
>>> It might bring more possibilities based on this pluggable mechanism.
>> Hope to see the progress soon. :)
>>>
>>> Best,
>>> Zhijiang
>>> ------------------------------------------------------------------
>>> From:Jeff Zhang <[hidden email]>
>>> Send Time:2019年4月10日(星期三) 22:01
>>> To:dev <[hidden email]>
>>> Subject:Re: Introducing Flink's Plugin mechanism
>>>
>>> Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
>>> important for the ecosystem of flink.
>>>
>>> Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:
>>>
>>>> Hi Flink developers,
>>>>
>>>> I would like to introduce a new plugin loading mechanism that we are
>>>> working on right now [1]. The idea is quite simple: isolate services in
>>>> separate independent class loaders, so that classes and dependencies do
>> not
>>>> leak between them and/or Flink runtime itself. Currently we have quite
>> some
>>>> problems with dependency convergence in multiple places. Some of them we
>>>> are solving by shading (built in file systems, metrics), some we are
>>>> forcing users to deal with them (custom file systems/metrics) and
>> others we
>>>> do not solve (connectors - we do not support using different Kafka
>> versions
>>>> in the same job/SQL). With proper plugins, that are loaded in
>> independent
>>>> class loaders, those issues could be solved in a generic way.
>>>>
>>>> Current scope of implementation targets only file systems, without a
>>>> centralised Plugin architecture and with Plugins that are only
>> “statically”
>>>> initialised at the TaskManager and JobManager start up. More or less we
>> are
>>>> just replacing the way how FileSystem’s implementations are discovered &
>>>> loaded.
>>>>
>>>> In the future this idea could be extended to different modules, like
>>>> metric reporters, connectors, functions/data types (especially in SQL),
>>>> state backends, internal storage or other future efforts. Some of those
>>>> would be easier than others: the metric reporters would require some
>>>> smaller refactor, while connectors would require some bigger API design
>>>> discussions, which I would like to avoid at the moment. Nevertheless I
>>>> wanted to reach out with this idea so if some other potential use cases
>> pop
>>>> up in the future, more people will be aware.
>>>>
>>>> Piotr Nowojski
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
>>>> https://issues.apache.org/jira/browse/FLINK-11952>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Rong Rong
Hi All,

Sorry for joining the discussion late. Thanks Piotrek for initiating this
effort.

I recall reporting a very similar bug years ago[1] that was not easily
solvable at the time, so +1 on this feature goes beyond just FileSystem :-)
I think this would definitely be beneficial as a useful way for others to
extend on top as a specialized framework/platform.

Here are some of my thoughts to the questions raised.
1. I am not exactly sure about the implementation details, but currently
Flink's Table factory discovery system [2] also handles loading of various
different table system in a way. I was wondering if this would be an
acceptable way to discover services. similar question was also raised in
the discussion for supporting Service-Provider pattern [3] in Flink
security module installation.
3. I am assuming this means DataStream object directly, not via
StreamExecutionEnvironment.addSource(sourceFunction) [4]. In this case, how
would the DataStream object be instantiated? any reason we would want to
support direct DataStream source/sink without going through the source/sink
factory?

I have not thought through about (2) and (4), but I think they are all very
valid questions as we also suffer from these pain points managing it with
our prod environment.
Looking forward to contributing to this effort!

Best,
Rong

[1] https://issues.apache.org/jira/browse/FLINK-7373
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablefactory
[3]
https://docs.google.com/document/d/1j96kjf-Nbk8Kii276SLSajhpCUuNHvNO5feZUnnPcPE/
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumer



On Wed, Apr 10, 2019 at 12:25 PM Piotr Nowojski <[hidden email]> wrote:

> Hi Biao,
>
> No there is not. The current scope is so small and it was actually very
> easy to implement, that design doc might have been an overkill for it.
> While for the future plans we decided consciously to not account for them
> and not plan at the moment, but to tackle them lazily.
>
> Idea behind current proposal is described at the moment in java docs [1],
> or in the tests and there will be of course a documentation update. Gist is
> to have `./plugins` directory, with subdirectories of jars:
>
> ./plugins/pluginA/foo-1.0.jar
> ./plugins/pluginA/bar-0.9.jar
>
> ./plugins/pluginB/foo-2.0.jar
>
> All of the jars from each plugin subdirectory will be loaded in separate
> class loaders and instead of looking for `FileSystem.class` implementations
> in the Flink’s main class loader, we will iterate over those plugins’ class
> loaders and search inside them. Which is a simple extension of the current
> mechanism.
>
> In the future we will need some discussion how do we want to evolve this
> mechanism to handle more than just `FileSystem`s:
> 1. Will we want to introduce some centralised architecture? That we would
> discover a `Plugin.class` implementations and `Plugin` would provide
> `FileSystem`, `MetricReporter`, `Connector` implementations? Or would we
> discover `FileSystem`, `MetricReporter`, `Connector` separately?
> 2. Do we want to support “dynamically” loaded plugins, provided during the
> job submission?
> 3. Do we want to support somehow DataStream Sources/Sinks as plugins? If
> yes, how would the API look like? Some shared between connectors
> Sinks/Sources factory interface?
> 4. How to expose `Plugins` inside operators, especially in Table API/SQL
> operators to support TableSource/SinkFactories loaded via plugins?
> 5. …?
>
> Those are some of the questions that we are intentionally trying to avoid
> at the moment.
>
> Piotrek
>
> [1]
> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5
> <
> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5
> >
>
> > On 10 Apr 2019, at 17:40, Biao Liu <[hidden email]> wrote:
> >
> > Hi Stefan &Piotr,
> > Thank you for bringing this discussion. As Zhijiang said, class conflict
> > makes a lot of trouble in our production environment.
> > I was wondering is there any design document currently? It might be
> helpful
> > to understand the PR and even the whole picture as Piotr said in the
> future
> > it could be extended to other modules.
> >
> > Stefan Richter <[hidden email]> 于2019年4月10日周三 下午11:22写道:
> >
> >> Thank you Piotr for bringing this discussion to the mailing list! As it
> >> was not explicitly mentioned in the first email, I wanted to add that
> there
> >> is also already an open PR[1] with my implementation of the basic plugin
> >> mechanism for FileSystem. Looking forward to some feedback from the
> >> community.
> >>
> >>
> >> [1] https://github.com/apache/flink/pull/8038 <
> >> https://github.com/apache/flink/pull/8038>
> >>
> >> Best,
> >> Stefan
> >>
> >>> On 10. Apr 2019, at 17:08, zhijiang <[hidden email]
> .INVALID>
> >> wrote:
> >>>
> >>> Thanks Piotr for proposing this new feature.
> >>>
> >>> The solution for class loader issue is really helpful in production,
> and
> >> we ofen encountered this pain point before.
> >>> It might bring more possibilities based on this pluggable mechanism.
> >> Hope to see the progress soon. :)
> >>>
> >>> Best,
> >>> Zhijiang
> >>> ------------------------------------------------------------------
> >>> From:Jeff Zhang <[hidden email]>
> >>> Send Time:2019年4月10日(星期三) 22:01
> >>> To:dev <[hidden email]>
> >>> Subject:Re: Introducing Flink's Plugin mechanism
> >>>
> >>> Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
> >>> important for the ecosystem of flink.
> >>>
> >>> Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:
> >>>
> >>>> Hi Flink developers,
> >>>>
> >>>> I would like to introduce a new plugin loading mechanism that we are
> >>>> working on right now [1]. The idea is quite simple: isolate services
> in
> >>>> separate independent class loaders, so that classes and dependencies
> do
> >> not
> >>>> leak between them and/or Flink runtime itself. Currently we have quite
> >> some
> >>>> problems with dependency convergence in multiple places. Some of them
> we
> >>>> are solving by shading (built in file systems, metrics), some we are
> >>>> forcing users to deal with them (custom file systems/metrics) and
> >> others we
> >>>> do not solve (connectors - we do not support using different Kafka
> >> versions
> >>>> in the same job/SQL). With proper plugins, that are loaded in
> >> independent
> >>>> class loaders, those issues could be solved in a generic way.
> >>>>
> >>>> Current scope of implementation targets only file systems, without a
> >>>> centralised Plugin architecture and with Plugins that are only
> >> “statically”
> >>>> initialised at the TaskManager and JobManager start up. More or less
> we
> >> are
> >>>> just replacing the way how FileSystem’s implementations are
> discovered &
> >>>> loaded.
> >>>>
> >>>> In the future this idea could be extended to different modules, like
> >>>> metric reporters, connectors, functions/data types (especially in
> SQL),
> >>>> state backends, internal storage or other future efforts. Some of
> those
> >>>> would be easier than others: the metric reporters would require some
> >>>> smaller refactor, while connectors would require some bigger API
> design
> >>>> discussions, which I would like to avoid at the moment. Nevertheless I
> >>>> wanted to reach out with this idea so if some other potential use
> cases
> >> pop
> >>>> up in the future, more people will be aware.
> >>>>
> >>>> Piotr Nowojski
> >>>>
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
> >>>> https://issues.apache.org/jira/browse/FLINK-11952>
> >>>
> >>>
> >>>
> >>> --
> >>> Best Regards
> >>>
> >>> Jeff Zhang
> >>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Introducing Flink's Plugin mechanism

Piotr Nowojski-3
Hi Rong,

t is definitely do-able to use Plugins for TableFactories discovery, but it would require more extensive changes/discussions. Couple of challenges there:

1. Currently TableFactories come from user job’s jar, which would mean support for dynamically loaded plugins or Pluggable TableFactories would have to be shipped statically (which I think should be acceptable, at least initially, but it’s different setup compared to the status quo).
2. How to expose Plugins at the Public API level? That would have to be solved because they would need to be accessed in the StreamingTableSource.

Piotrek

> On 17 Apr 2019, at 02:59, Rong Rong <[hidden email]> wrote:
>
> Hi All,
>
> Sorry for joining the discussion late. Thanks Piotrek for initiating this
> effort.
>
> I recall reporting a very similar bug years ago[1] that was not easily
> solvable at the time, so +1 on this feature goes beyond just FileSystem :-)
> I think this would definitely be beneficial as a useful way for others to
> extend on top as a specialized framework/platform.
>
> Here are some of my thoughts to the questions raised.
> 1. I am not exactly sure about the implementation details, but currently
> Flink's Table factory discovery system [2] also handles loading of various
> different table system in a way. I was wondering if this would be an
> acceptable way to discover services. similar question was also raised in
> the discussion for supporting Service-Provider pattern [3] in Flink
> security module installation.
> 3. I am assuming this means DataStream object directly, not via
> StreamExecutionEnvironment.addSource(sourceFunction) [4]. In this case, how
> would the DataStream object be instantiated? any reason we would want to
> support direct DataStream source/sink without going through the source/sink
> factory?
>
> I have not thought through about (2) and (4), but I think they are all very
> valid questions as we also suffer from these pain points managing it with
> our prod environment.
> Looking forward to contributing to this effort!
>
> Best,
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-7373 <https://issues.apache.org/jira/browse/FLINK-7373>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablefactory <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablefactory>
> [3]
> https://docs.google.com/document/d/1j96kjf-Nbk8Kii276SLSajhpCUuNHvNO5feZUnnPcPE/ <https://docs.google.com/document/d/1j96kjf-Nbk8Kii276SLSajhpCUuNHvNO5feZUnnPcPE/>
> [4]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumer <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumer>
>
>
>
> On Wed, Apr 10, 2019 at 12:25 PM Piotr Nowojski <[hidden email] <mailto:[hidden email]>> wrote:
>
>> Hi Biao,
>>
>> No there is not. The current scope is so small and it was actually very
>> easy to implement, that design doc might have been an overkill for it.
>> While for the future plans we decided consciously to not account for them
>> and not plan at the moment, but to tackle them lazily.
>>
>> Idea behind current proposal is described at the moment in java docs [1],
>> or in the tests and there will be of course a documentation update. Gist is
>> to have `./plugins` directory, with subdirectories of jars:
>>
>> ./plugins/pluginA/foo-1.0.jar
>> ./plugins/pluginA/bar-0.9.jar
>>
>> ./plugins/pluginB/foo-2.0.jar
>>
>> All of the jars from each plugin subdirectory will be loaded in separate
>> class loaders and instead of looking for `FileSystem.class` implementations
>> in the Flink’s main class loader, we will iterate over those plugins’ class
>> loaders and search inside them. Which is a simple extension of the current
>> mechanism.
>>
>> In the future we will need some discussion how do we want to evolve this
>> mechanism to handle more than just `FileSystem`s:
>> 1. Will we want to introduce some centralised architecture? That we would
>> discover a `Plugin.class` implementations and `Plugin` would provide
>> `FileSystem`, `MetricReporter`, `Connector` implementations? Or would we
>> discover `FileSystem`, `MetricReporter`, `Connector` separately?
>> 2. Do we want to support “dynamically” loaded plugins, provided during the
>> job submission?
>> 3. Do we want to support somehow DataStream Sources/Sinks as plugins? If
>> yes, how would the API look like? Some shared between connectors
>> Sinks/Sources factory interface?
>> 4. How to expose `Plugins` inside operators, especially in Table API/SQL
>> operators to support TableSource/SinkFactories loaded via plugins?
>> 5. …?
>>
>> Those are some of the questions that we are intentionally trying to avoid
>> at the moment.
>>
>> Piotrek
>>
>> [1]
>> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5
>> <
>> https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5 <https://github.com/apache/flink/pull/8038/commits/06d07e846d1b3682538cea203de1371fc79b9940#diff-0599320f7dc729e412841c58534d6fe5>
>>>
>>
>>> On 10 Apr 2019, at 17:40, Biao Liu <[hidden email]> wrote:
>>>
>>> Hi Stefan &Piotr,
>>> Thank you for bringing this discussion. As Zhijiang said, class conflict
>>> makes a lot of trouble in our production environment.
>>> I was wondering is there any design document currently? It might be
>> helpful
>>> to understand the PR and even the whole picture as Piotr said in the
>> future
>>> it could be extended to other modules.
>>>
>>> Stefan Richter <[hidden email]> 于2019年4月10日周三 下午11:22写道:
>>>
>>>> Thank you Piotr for bringing this discussion to the mailing list! As it
>>>> was not explicitly mentioned in the first email, I wanted to add that
>> there
>>>> is also already an open PR[1] with my implementation of the basic plugin
>>>> mechanism for FileSystem. Looking forward to some feedback from the
>>>> community.
>>>>
>>>>
>>>> [1] https://github.com/apache/flink/pull/8038 <
>>>> https://github.com/apache/flink/pull/8038>
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>> On 10. Apr 2019, at 17:08, zhijiang <[hidden email]
>> .INVALID>
>>>> wrote:
>>>>>
>>>>> Thanks Piotr for proposing this new feature.
>>>>>
>>>>> The solution for class loader issue is really helpful in production,
>> and
>>>> we ofen encountered this pain point before.
>>>>> It might bring more possibilities based on this pluggable mechanism.
>>>> Hope to see the progress soon. :)
>>>>>
>>>>> Best,
>>>>> Zhijiang
>>>>> ------------------------------------------------------------------
>>>>> From:Jeff Zhang <[hidden email]>
>>>>> Send Time:2019年4月10日(星期三) 22:01
>>>>> To:dev <[hidden email]>
>>>>> Subject:Re: Introducing Flink's Plugin mechanism
>>>>>
>>>>> Thank Piotr for driving this plugin mechanism.  Pluggability is pretty
>>>>> important for the ecosystem of flink.
>>>>>
>>>>> Piotr Nowojski <[hidden email]> 于2019年4月10日周三 下午5:48写道:
>>>>>
>>>>>> Hi Flink developers,
>>>>>>
>>>>>> I would like to introduce a new plugin loading mechanism that we are
>>>>>> working on right now [1]. The idea is quite simple: isolate services
>> in
>>>>>> separate independent class loaders, so that classes and dependencies
>> do
>>>> not
>>>>>> leak between them and/or Flink runtime itself. Currently we have quite
>>>> some
>>>>>> problems with dependency convergence in multiple places. Some of them
>> we
>>>>>> are solving by shading (built in file systems, metrics), some we are
>>>>>> forcing users to deal with them (custom file systems/metrics) and
>>>> others we
>>>>>> do not solve (connectors - we do not support using different Kafka
>>>> versions
>>>>>> in the same job/SQL). With proper plugins, that are loaded in
>>>> independent
>>>>>> class loaders, those issues could be solved in a generic way.
>>>>>>
>>>>>> Current scope of implementation targets only file systems, without a
>>>>>> centralised Plugin architecture and with Plugins that are only
>>>> “statically”
>>>>>> initialised at the TaskManager and JobManager start up. More or less
>> we
>>>> are
>>>>>> just replacing the way how FileSystem’s implementations are
>> discovered &
>>>>>> loaded.
>>>>>>
>>>>>> In the future this idea could be extended to different modules, like
>>>>>> metric reporters, connectors, functions/data types (especially in
>> SQL),
>>>>>> state backends, internal storage or other future efforts. Some of
>> those
>>>>>> would be easier than others: the metric reporters would require some
>>>>>> smaller refactor, while connectors would require some bigger API
>> design
>>>>>> discussions, which I would like to avoid at the moment. Nevertheless I
>>>>>> wanted to reach out with this idea so if some other potential use
>> cases
>>>> pop
>>>>>> up in the future, more people will be aware.
>>>>>>
>>>>>> Piotr Nowojski
>>>>>>
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11952 <
>>>>>> https://issues.apache.org/jira/browse/FLINK-11952>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang