[VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

[VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Wei Zhong-2
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

jincheng sun
+1

Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:

> Hi all,
>
> I would like to start the vote for FLIP-78[1] which is discussed and
> reached consensus in the discussion thread[2].
>
> The vote will be open for at least 72 hours. I'll try to close it by
> 2019-10-16 18:00 UTC, unless there is an objection or not enough votes.
>
> Thanks,
> Wei
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> >
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Dian Fu-2
Hi Wei,

+1 (non-binding). Thanks for driving this.

Thanks,
Dian

> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
>
> +1
>
> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>
>> Hi all,
>>
>> I would like to start the vote for FLIP-78[1] which is discussed and
>> reached consensus in the discussion thread[2].
>>
>> The vote will be open for at least 72 hours. I'll try to close it by
>> 2019-10-16 18:00 UTC, unless there is an objection or not enough votes.
>>
>> Thanks,
>> Wei
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Hequn Cheng
+1

Good job, Wei!

Best, Hequn

On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]> wrote:

> Hi Wei,
>
> +1 (non-binding). Thanks for driving this.
>
> Thanks,
> Dian
>
> > 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
> >
> > +1
> >
> > Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> >
> >> Hi all,
> >>
> >> I would like to start the vote for FLIP-78[1] which is discussed and
> >> reached consensus in the discussion thread[2].
> >>
> >> The vote will be open for at least 72 hours. I'll try to close it by
> >> 2019-10-16 18:00 UTC, unless there is an objection or not enough votes.
> >>
> >> Thanks,
> >> Wei
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> >>>
> >> [2]
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >> <
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >>>
> >>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Jeff Zhang
+1

Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:

> +1
>
> Good job, Wei!
>
> Best, Hequn
>
> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]> wrote:
>
> > Hi Wei,
> >
> > +1 (non-binding). Thanks for driving this.
> >
> > Thanks,
> > Dian
> >
> > > 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
> > >
> > > +1
> > >
> > > Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > >
> > >> Hi all,
> > >>
> > >> I would like to start the vote for FLIP-78[1] which is discussed and
> > >> reached consensus in the discussion thread[2].
> > >>
> > >> The vote will be open for at least 72 hours. I'll try to close it by
> > >> 2019-10-16 18:00 UTC, unless there is an objection or not enough
> votes.
> > >>
> > >> Thanks,
> > >> Wei
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > >>>
> > >> [2]
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > >> <
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > >>>
> > >>
> > >>
> > >>
> >
> >
>


--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Thomas Weise
Sorry for joining the discussion late.

The Beam environment already supports artifact staging, it works out of the
box with the Docker environment. I think it would be helpful to explain in
the FLIP how this proposal relates to what Beam offers / how it would be
integrated.

Thanks,
Thomas


On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]> wrote:

> +1
>
> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>
> > +1
> >
> > Good job, Wei!
> >
> > Best, Hequn
> >
> > On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]> wrote:
> >
> > > Hi Wei,
> > >
> > > +1 (non-binding). Thanks for driving this.
> > >
> > > Thanks,
> > > Dian
> > >
> > > > 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
> > > >
> > > > +1
> > > >
> > > > Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I would like to start the vote for FLIP-78[1] which is discussed and
> > > >> reached consensus in the discussion thread[2].
> > > >>
> > > >> The vote will be open for at least 72 hours. I'll try to close it by
> > > >> 2019-10-16 18:00 UTC, unless there is an objection or not enough
> > votes.
> > > >>
> > > >> Thanks,
> > > >> Wei
> > > >>
> > > >> [1]
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > > >> <
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > > >>>
> > > >> [2]
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > >> <
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > >>>
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Wei Zhong-2
Hi Thomas,

Thanks a lot for your suggestion!

As you can see from the section "Goals" that this FLIP focuses on the dependency management in process mode. However, the APIs and design proposed in this FLIP also applies for the docker mode. So it makes sense to me to also describe how this design is integated to the artifact staging service of Apache Beam in docker mode. I have updated the design doc and looking forward to your feedback.

Thanks,
Wei

> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>
> Sorry for joining the discussion late.
>
> The Beam environment already supports artifact staging, it works out of the
> box with the Docker environment. I think it would be helpful to explain in
> the FLIP how this proposal relates to what Beam offers / how it would be
> integrated.
>
> Thanks,
> Thomas
>
>
> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]> wrote:
>
>> +1
>>
>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>
>>> +1
>>>
>>> Good job, Wei!
>>>
>>> Best, Hequn
>>>
>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]> wrote:
>>>
>>>> Hi Wei,
>>>>
>>>> +1 (non-binding). Thanks for driving this.
>>>>
>>>> Thanks,
>>>> Dian
>>>>
>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
>>>>>
>>>>> +1
>>>>>
>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I would like to start the vote for FLIP-78[1] which is discussed and
>>>>>> reached consensus in the discussion thread[2].
>>>>>>
>>>>>> The vote will be open for at least 72 hours. I'll try to close it by
>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not enough
>>> votes.
>>>>>>
>>>>>> Thanks,
>>>>>> Wei
>>>>>>
>>>>>> [1]
>>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>> <
>>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>
>>>>>> [2]
>>>>>>
>>>>
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>> <
>>>>>>
>>>>
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>

mxm
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

mxm
I'm also late to the party here :) When I saw the first draft, I was
thinking how exactly the design doc would tie in with Beam. Thanks for
the update.

A couple of comments with this regard:

> Flink has provided a distributed cache mechanism and allows users to upload their files using "registerCachedFile" method in ExecutionEnvironment/StreamExecutionEnvironment. The python files users specified through "add_python_file", "set_python_requirements" and "add_python_archive" are also uploaded through this method eventually.

For process-based execution we use Flink's cache distribution instead of
Beam's artifact staging.

> Apache Beam Portability Framework already supports artifact staging that works out of the box with the Docker environment. We can use the artifact staging service defined in Apache Beam to transfer the dependencies from the operator to Python SDK harness running in the docker container.

Do we want to implement two different ways of staging artifacts? It
seems sensible to use the same artifact staging functionality also for
the process-based execution. Apart from being simpler, this would also
allow the process-based execution to run in other environments than the
Flink TaskManager environment.

Thanks,
Max

On 15.10.19 11:13, Wei Zhong wrote:

> Hi Thomas,
>
> Thanks a lot for your suggestion!
>
> As you can see from the section "Goals" that this FLIP focuses on the dependency management in process mode. However, the APIs and design proposed in this FLIP also applies for the docker mode. So it makes sense to me to also describe how this design is integated to the artifact staging service of Apache Beam in docker mode. I have updated the design doc and looking forward to your feedback.
>
> Thanks,
> Wei
>
>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>>
>> Sorry for joining the discussion late.
>>
>> The Beam environment already supports artifact staging, it works out of the
>> box with the Docker environment. I think it would be helpful to explain in
>> the FLIP how this proposal relates to what Beam offers / how it would be
>> integrated.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]> wrote:
>>
>>> +1
>>>
>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>>
>>>> +1
>>>>
>>>> Good job, Wei!
>>>>
>>>> Best, Hequn
>>>>
>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]> wrote:
>>>>
>>>>> Hi Wei,
>>>>>
>>>>> +1 (non-binding). Thanks for driving this.
>>>>>
>>>>> Thanks,
>>>>> Dian
>>>>>
>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
>>>>>>
>>>>>> +1
>>>>>>
>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I would like to start the vote for FLIP-78[1] which is discussed and
>>>>>>> reached consensus in the discussion thread[2].
>>>>>>>
>>>>>>> The vote will be open for at least 72 hours. I'll try to close it by
>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not enough
>>>> votes.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Wei
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>> <
>>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>
>>>>>>> [2]
>>>>>>>
>>>>>
>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>> <
>>>>>>>
>>>>>
>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

jincheng sun
Hi Max,

Sorry for the late reply. Regarding the issue you mentioned above, I'm glad
to share my thoughts:

> For process-based execution we use Flink's cache distribution instead of
Beam's artifact staging.

In current design, we use Flink's cache distribution to upload users' files
from client to cluster in both docker mode and process mode. That is,
Flink's cache distribution and Beam's artifact staging service work
together in docker mode.


> Do we want to implement two different ways of staging artifacts? It seems
sensible to use the same artifact staging functionality also for the
process-based execution.

I agree that the implementation will be simple if we use the same artifact
staging functionality also for the process-based execution. However, it's
not the best for performance as it will introduce an additional network
transmission, as in process mode TaskManager and python worker share the
same environment, in which case the user files in Flink Distribute Cache
can be accessed by python worker directly. We do not need the staging
service in this case.

> Apart from being simpler, this would also allow the process-based
execution to run in other environments than the Flink TaskManager
environment.

IMHO, this case is more like docker mode, and we can share or reuse the
code of Beam docker mode. Furthermore, in this case python worker is
launched by the operator, so it is always in the same environment as the
operator.

Thanks again for your feedback, and it is valuable for find out the final
best architecture.

Feel free to correct me if there is anything incorrect.

Best,
Jincheng

Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:

> I'm also late to the party here :) When I saw the first draft, I was
> thinking how exactly the design doc would tie in with Beam. Thanks for
> the update.
>
> A couple of comments with this regard:
>
> > Flink has provided a distributed cache mechanism and allows users to
> upload their files using "registerCachedFile" method in
> ExecutionEnvironment/StreamExecutionEnvironment. The python files users
> specified through "add_python_file", "set_python_requirements" and
> "add_python_archive" are also uploaded through this method eventually.
>
> For process-based execution we use Flink's cache distribution instead of
> Beam's artifact staging.
>
> > Apache Beam Portability Framework already supports artifact staging that
> works out of the box with the Docker environment. We can use the artifact
> staging service defined in Apache Beam to transfer the dependencies from
> the operator to Python SDK harness running in the docker container.
>
> Do we want to implement two different ways of staging artifacts? It
> seems sensible to use the same artifact staging functionality also for
> the process-based execution. Apart from being simpler, this would also
> allow the process-based execution to run in other environments than the
> Flink TaskManager environment.
>
> Thanks,
> Max
>
> On 15.10.19 11:13, Wei Zhong wrote:
> > Hi Thomas,
> >
> > Thanks a lot for your suggestion!
> >
> > As you can see from the section "Goals" that this FLIP focuses on the
> dependency management in process mode. However, the APIs and design
> proposed in this FLIP also applies for the docker mode. So it makes sense
> to me to also describe how this design is integated to the artifact staging
> service of Apache Beam in docker mode. I have updated the design doc and
> looking forward to your feedback.
> >
> > Thanks,
> > Wei
> >
> >> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> >>
> >> Sorry for joining the discussion late.
> >>
> >> The Beam environment already supports artifact staging, it works out of
> the
> >> box with the Docker environment. I think it would be helpful to explain
> in
> >> the FLIP how this proposal relates to what Beam offers / how it would be
> >> integrated.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]> wrote:
> >>
> >>> +1
> >>>
> >>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> >>>
> >>>> +1
> >>>>
> >>>> Good job, Wei!
> >>>>
> >>>> Best, Hequn
> >>>>
> >>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]>
> wrote:
> >>>>
> >>>>> Hi Wei,
> >>>>>
> >>>>> +1 (non-binding). Thanks for driving this.
> >>>>>
> >>>>> Thanks,
> >>>>> Dian
> >>>>>
> >>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
> >>>>>>
> >>>>>> +1
> >>>>>>
> >>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I would like to start the vote for FLIP-78[1] which is discussed
> and
> >>>>>>> reached consensus in the discussion thread[2].
> >>>>>>>
> >>>>>>> The vote will be open for at least 72 hours. I'll try to close it
> by
> >>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not enough
> >>>> votes.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Wei
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> >>>>>>>>
> >>>>>>> [2]
> >>>>>>>
> >>>>>
> >>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best Regards
> >>>
> >>> Jeff Zhang
> >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Thomas Weise
Beam artifact staging currently relies on shared file system and there are
limitations, for example when running locally with Docker and local FS. It
sounds like a distributed cache based implementation might be a good
(better?) option for artifact staging even for the Beam Flink runner?

If so, can the implementation you propose be compatible with the Beam
artifact staging service so that it can be plugged into the Beam Flink
runner?

Thanks,
Thomas


On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]>
wrote:

> Hi Max,
>
> Sorry for the late reply. Regarding the issue you mentioned above, I'm glad
> to share my thoughts:
>
> > For process-based execution we use Flink's cache distribution instead of
> Beam's artifact staging.
>
> In current design, we use Flink's cache distribution to upload users' files
> from client to cluster in both docker mode and process mode. That is,
> Flink's cache distribution and Beam's artifact staging service work
> together in docker mode.
>
>
> > Do we want to implement two different ways of staging artifacts? It seems
> sensible to use the same artifact staging functionality also for the
> process-based execution.
>
> I agree that the implementation will be simple if we use the same artifact
> staging functionality also for the process-based execution. However, it's
> not the best for performance as it will introduce an additional network
> transmission, as in process mode TaskManager and python worker share the
> same environment, in which case the user files in Flink Distribute Cache
> can be accessed by python worker directly. We do not need the staging
> service in this case.
>
> > Apart from being simpler, this would also allow the process-based
> execution to run in other environments than the Flink TaskManager
> environment.
>
> IMHO, this case is more like docker mode, and we can share or reuse the
> code of Beam docker mode. Furthermore, in this case python worker is
> launched by the operator, so it is always in the same environment as the
> operator.
>
> Thanks again for your feedback, and it is valuable for find out the final
> best architecture.
>
> Feel free to correct me if there is anything incorrect.
>
> Best,
> Jincheng
>
> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
>
> > I'm also late to the party here :) When I saw the first draft, I was
> > thinking how exactly the design doc would tie in with Beam. Thanks for
> > the update.
> >
> > A couple of comments with this regard:
> >
> > > Flink has provided a distributed cache mechanism and allows users to
> > upload their files using "registerCachedFile" method in
> > ExecutionEnvironment/StreamExecutionEnvironment. The python files users
> > specified through "add_python_file", "set_python_requirements" and
> > "add_python_archive" are also uploaded through this method eventually.
> >
> > For process-based execution we use Flink's cache distribution instead of
> > Beam's artifact staging.
> >
> > > Apache Beam Portability Framework already supports artifact staging
> that
> > works out of the box with the Docker environment. We can use the artifact
> > staging service defined in Apache Beam to transfer the dependencies from
> > the operator to Python SDK harness running in the docker container.
> >
> > Do we want to implement two different ways of staging artifacts? It
> > seems sensible to use the same artifact staging functionality also for
> > the process-based execution. Apart from being simpler, this would also
> > allow the process-based execution to run in other environments than the
> > Flink TaskManager environment.
> >
> > Thanks,
> > Max
> >
> > On 15.10.19 11:13, Wei Zhong wrote:
> > > Hi Thomas,
> > >
> > > Thanks a lot for your suggestion!
> > >
> > > As you can see from the section "Goals" that this FLIP focuses on the
> > dependency management in process mode. However, the APIs and design
> > proposed in this FLIP also applies for the docker mode. So it makes sense
> > to me to also describe how this design is integated to the artifact
> staging
> > service of Apache Beam in docker mode. I have updated the design doc and
> > looking forward to your feedback.
> > >
> > > Thanks,
> > > Wei
> > >
> > >> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> > >>
> > >> Sorry for joining the discussion late.
> > >>
> > >> The Beam environment already supports artifact staging, it works out
> of
> > the
> > >> box with the Docker environment. I think it would be helpful to
> explain
> > in
> > >> the FLIP how this proposal relates to what Beam offers / how it would
> be
> > >> integrated.
> > >>
> > >> Thanks,
> > >> Thomas
> > >>
> > >>
> > >> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> > >>>
> > >>>> +1
> > >>>>
> > >>>> Good job, Wei!
> > >>>>
> > >>>> Best, Hequn
> > >>>>
> > >>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]>
> > wrote:
> > >>>>
> > >>>>> Hi Wei,
> > >>>>>
> > >>>>> +1 (non-binding). Thanks for driving this.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Dian
> > >>>>>
> > >>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]> 写道:
> > >>>>>>
> > >>>>>> +1
> > >>>>>>
> > >>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> I would like to start the vote for FLIP-78[1] which is discussed
> > and
> > >>>>>>> reached consensus in the discussion thread[2].
> > >>>>>>>
> > >>>>>>> The vote will be open for at least 72 hours. I'll try to close it
> > by
> > >>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not enough
> > >>>> votes.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Wei
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > >>>>>>> <
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > >>>>>>>>
> > >>>>>>> [2]
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > >>>>>>> <
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> Best Regards
> > >>>
> > >>> Jeff Zhang
> > >>>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

jincheng sun
Hi Thomas,

Thanks for sharing your thoughts. I think improve and solve the limitations
of the Beam artifact staging is good topic(For beam).

As I understand it as follows:

For Beam(data):
    Stage1: BeamClient ------> JobService (data will be upload to DFS).
    Stage2: JobService(FlinkClient) ------>  FlinkJob (operator download
the data from DFS)
    Stage3: Operator ------> Harness(artifact staging service)

For Flink(data):
    Stage1: FlinkClient(data(local) upload to BlobServer using distribute
cache) ------> Operator (data will be download from BlobServer). Do not
have to depend on DFS.
    Stage2: Operator ------> Harness(for docker we using artifact staging
service)

So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
distribute cache if we remove the dependency of DFS for Beam in Stage1.(Of
course we need more detail here),  we can bring up the discussion in a
separate Beam dev@ ML, the current discussion focuses on Flink 1.10 version
of  UDF Environment and Dependency Management for python, so I recommend
voting in the current ML for Flink 1.10, Beam artifact staging improvements
are discussed in a separate Beam dev@.

What do you think?

Best,
Jincheng

Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:

> Beam artifact staging currently relies on shared file system and there are
> limitations, for example when running locally with Docker and local FS. It
> sounds like a distributed cache based implementation might be a good
> (better?) option for artifact staging even for the Beam Flink runner?
>
> If so, can the implementation you propose be compatible with the Beam
> artifact staging service so that it can be plugged into the Beam Flink
> runner?
>
> Thanks,
> Thomas
>
>
> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]>
> wrote:
>
> > Hi Max,
> >
> > Sorry for the late reply. Regarding the issue you mentioned above, I'm
> glad
> > to share my thoughts:
> >
> > > For process-based execution we use Flink's cache distribution instead
> of
> > Beam's artifact staging.
> >
> > In current design, we use Flink's cache distribution to upload users'
> files
> > from client to cluster in both docker mode and process mode. That is,
> > Flink's cache distribution and Beam's artifact staging service work
> > together in docker mode.
> >
> >
> > > Do we want to implement two different ways of staging artifacts? It
> seems
> > sensible to use the same artifact staging functionality also for the
> > process-based execution.
> >
> > I agree that the implementation will be simple if we use the same
> artifact
> > staging functionality also for the process-based execution. However, it's
> > not the best for performance as it will introduce an additional network
> > transmission, as in process mode TaskManager and python worker share the
> > same environment, in which case the user files in Flink Distribute Cache
> > can be accessed by python worker directly. We do not need the staging
> > service in this case.
> >
> > > Apart from being simpler, this would also allow the process-based
> > execution to run in other environments than the Flink TaskManager
> > environment.
> >
> > IMHO, this case is more like docker mode, and we can share or reuse the
> > code of Beam docker mode. Furthermore, in this case python worker is
> > launched by the operator, so it is always in the same environment as the
> > operator.
> >
> > Thanks again for your feedback, and it is valuable for find out the final
> > best architecture.
> >
> > Feel free to correct me if there is anything incorrect.
> >
> > Best,
> > Jincheng
> >
> > Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
> >
> > > I'm also late to the party here :) When I saw the first draft, I was
> > > thinking how exactly the design doc would tie in with Beam. Thanks for
> > > the update.
> > >
> > > A couple of comments with this regard:
> > >
> > > > Flink has provided a distributed cache mechanism and allows users to
> > > upload their files using "registerCachedFile" method in
> > > ExecutionEnvironment/StreamExecutionEnvironment. The python files users
> > > specified through "add_python_file", "set_python_requirements" and
> > > "add_python_archive" are also uploaded through this method eventually.
> > >
> > > For process-based execution we use Flink's cache distribution instead
> of
> > > Beam's artifact staging.
> > >
> > > > Apache Beam Portability Framework already supports artifact staging
> > that
> > > works out of the box with the Docker environment. We can use the
> artifact
> > > staging service defined in Apache Beam to transfer the dependencies
> from
> > > the operator to Python SDK harness running in the docker container.
> > >
> > > Do we want to implement two different ways of staging artifacts? It
> > > seems sensible to use the same artifact staging functionality also for
> > > the process-based execution. Apart from being simpler, this would also
> > > allow the process-based execution to run in other environments than the
> > > Flink TaskManager environment.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 15.10.19 11:13, Wei Zhong wrote:
> > > > Hi Thomas,
> > > >
> > > > Thanks a lot for your suggestion!
> > > >
> > > > As you can see from the section "Goals" that this FLIP focuses on the
> > > dependency management in process mode. However, the APIs and design
> > > proposed in this FLIP also applies for the docker mode. So it makes
> sense
> > > to me to also describe how this design is integated to the artifact
> > staging
> > > service of Apache Beam in docker mode. I have updated the design doc
> and
> > > looking forward to your feedback.
> > > >
> > > > Thanks,
> > > > Wei
> > > >
> > > >> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> > > >>
> > > >> Sorry for joining the discussion late.
> > > >>
> > > >> The Beam environment already supports artifact staging, it works out
> > of
> > > the
> > > >> box with the Docker environment. I think it would be helpful to
> > explain
> > > in
> > > >> the FLIP how this proposal relates to what Beam offers / how it
> would
> > be
> > > >> integrated.
> > > >>
> > > >> Thanks,
> > > >> Thomas
> > > >>
> > > >>
> > > >> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
> wrote:
> > > >>
> > > >>> +1
> > > >>>
> > > >>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> > > >>>
> > > >>>> +1
> > > >>>>
> > > >>>> Good job, Wei!
> > > >>>>
> > > >>>> Best, Hequn
> > > >>>>
> > > >>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]>
> > > wrote:
> > > >>>>
> > > >>>>> Hi Wei,
> > > >>>>>
> > > >>>>> +1 (non-binding). Thanks for driving this.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Dian
> > > >>>>>
> > > >>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]>
> 写道:
> > > >>>>>>
> > > >>>>>> +1
> > > >>>>>>
> > > >>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > > >>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> I would like to start the vote for FLIP-78[1] which is
> discussed
> > > and
> > > >>>>>>> reached consensus in the discussion thread[2].
> > > >>>>>>>
> > > >>>>>>> The vote will be open for at least 72 hours. I'll try to close
> it
> > > by
> > > >>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
> enough
> > > >>>> votes.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Wei
> > > >>>>>>>
> > > >>>>>>> [1]
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > > >>>>>>> <
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > > >>>>>>>>
> > > >>>>>>> [2]
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > >>>>>>> <
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Best Regards
> > > >>>
> > > >>> Jeff Zhang
> > > >>>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Thomas Weise
Hi Jincheng,

Yes, this topic can be further discussed on the Beam ML. The only reason I
brought it up here is that it would be desirable from Beam Flink runner
perspective for the artifact staging mechanism that you work on to be
reusable.

Stage 1 in Beam is also up to the runner, artifact staging is a service
discovered from the job server and that the Flink job server currently uses
DFS is not set in stone. My interest was more regarding assumptions
regarding the artifact structure, which may or may not allow for reusable
implementation.

+1 for the proposal otherwise

Thomas


On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]>
wrote:

> Hi Thomas,
>
> Thanks for sharing your thoughts. I think improve and solve the limitations
> of the Beam artifact staging is good topic(For beam).
>
> As I understand it as follows:
>
> For Beam(data):
>     Stage1: BeamClient ------> JobService (data will be upload to DFS).
>     Stage2: JobService(FlinkClient) ------>  FlinkJob (operator download
> the data from DFS)
>     Stage3: Operator ------> Harness(artifact staging service)
>
> For Flink(data):
>     Stage1: FlinkClient(data(local) upload to BlobServer using distribute
> cache) ------> Operator (data will be download from BlobServer). Do not
> have to depend on DFS.
>     Stage2: Operator ------> Harness(for docker we using artifact staging
> service)
>
> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
> distribute cache if we remove the dependency of DFS for Beam in Stage1.(Of
> course we need more detail here),  we can bring up the discussion in a
> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
> version
> of  UDF Environment and Dependency Management for python, so I recommend
> voting in the current ML for Flink 1.10, Beam artifact staging improvements
> are discussed in a separate Beam dev@.
>
> What do you think?
>
> Best,
> Jincheng
>
> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
>
> > Beam artifact staging currently relies on shared file system and there
> are
> > limitations, for example when running locally with Docker and local FS.
> It
> > sounds like a distributed cache based implementation might be a good
> > (better?) option for artifact staging even for the Beam Flink runner?
> >
> > If so, can the implementation you propose be compatible with the Beam
> > artifact staging service so that it can be plugged into the Beam Flink
> > runner?
> >
> > Thanks,
> > Thomas
> >
> >
> > On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]>
> > wrote:
> >
> > > Hi Max,
> > >
> > > Sorry for the late reply. Regarding the issue you mentioned above, I'm
> > glad
> > > to share my thoughts:
> > >
> > > > For process-based execution we use Flink's cache distribution instead
> > of
> > > Beam's artifact staging.
> > >
> > > In current design, we use Flink's cache distribution to upload users'
> > files
> > > from client to cluster in both docker mode and process mode. That is,
> > > Flink's cache distribution and Beam's artifact staging service work
> > > together in docker mode.
> > >
> > >
> > > > Do we want to implement two different ways of staging artifacts? It
> > seems
> > > sensible to use the same artifact staging functionality also for the
> > > process-based execution.
> > >
> > > I agree that the implementation will be simple if we use the same
> > artifact
> > > staging functionality also for the process-based execution. However,
> it's
> > > not the best for performance as it will introduce an additional network
> > > transmission, as in process mode TaskManager and python worker share
> the
> > > same environment, in which case the user files in Flink Distribute
> Cache
> > > can be accessed by python worker directly. We do not need the staging
> > > service in this case.
> > >
> > > > Apart from being simpler, this would also allow the process-based
> > > execution to run in other environments than the Flink TaskManager
> > > environment.
> > >
> > > IMHO, this case is more like docker mode, and we can share or reuse the
> > > code of Beam docker mode. Furthermore, in this case python worker is
> > > launched by the operator, so it is always in the same environment as
> the
> > > operator.
> > >
> > > Thanks again for your feedback, and it is valuable for find out the
> final
> > > best architecture.
> > >
> > > Feel free to correct me if there is anything incorrect.
> > >
> > > Best,
> > > Jincheng
> > >
> > > Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
> > >
> > > > I'm also late to the party here :) When I saw the first draft, I was
> > > > thinking how exactly the design doc would tie in with Beam. Thanks
> for
> > > > the update.
> > > >
> > > > A couple of comments with this regard:
> > > >
> > > > > Flink has provided a distributed cache mechanism and allows users
> to
> > > > upload their files using "registerCachedFile" method in
> > > > ExecutionEnvironment/StreamExecutionEnvironment. The python files
> users
> > > > specified through "add_python_file", "set_python_requirements" and
> > > > "add_python_archive" are also uploaded through this method
> eventually.
> > > >
> > > > For process-based execution we use Flink's cache distribution instead
> > of
> > > > Beam's artifact staging.
> > > >
> > > > > Apache Beam Portability Framework already supports artifact staging
> > > that
> > > > works out of the box with the Docker environment. We can use the
> > artifact
> > > > staging service defined in Apache Beam to transfer the dependencies
> > from
> > > > the operator to Python SDK harness running in the docker container.
> > > >
> > > > Do we want to implement two different ways of staging artifacts? It
> > > > seems sensible to use the same artifact staging functionality also
> for
> > > > the process-based execution. Apart from being simpler, this would
> also
> > > > allow the process-based execution to run in other environments than
> the
> > > > Flink TaskManager environment.
> > > >
> > > > Thanks,
> > > > Max
> > > >
> > > > On 15.10.19 11:13, Wei Zhong wrote:
> > > > > Hi Thomas,
> > > > >
> > > > > Thanks a lot for your suggestion!
> > > > >
> > > > > As you can see from the section "Goals" that this FLIP focuses on
> the
> > > > dependency management in process mode. However, the APIs and design
> > > > proposed in this FLIP also applies for the docker mode. So it makes
> > sense
> > > > to me to also describe how this design is integated to the artifact
> > > staging
> > > > service of Apache Beam in docker mode. I have updated the design doc
> > and
> > > > looking forward to your feedback.
> > > > >
> > > > > Thanks,
> > > > > Wei
> > > > >
> > > > >> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> > > > >>
> > > > >> Sorry for joining the discussion late.
> > > > >>
> > > > >> The Beam environment already supports artifact staging, it works
> out
> > > of
> > > > the
> > > > >> box with the Docker environment. I think it would be helpful to
> > > explain
> > > > in
> > > > >> the FLIP how this proposal relates to what Beam offers / how it
> > would
> > > be
> > > > >> integrated.
> > > > >>
> > > > >> Thanks,
> > > > >> Thomas
> > > > >>
> > > > >>
> > > > >> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
> > wrote:
> > > > >>
> > > > >>> +1
> > > > >>>
> > > > >>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> > > > >>>
> > > > >>>> +1
> > > > >>>>
> > > > >>>> Good job, Wei!
> > > > >>>>
> > > > >>>> Best, Hequn
> > > > >>>>
> > > > >>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <[hidden email]>
> > > > wrote:
> > > > >>>>
> > > > >>>>> Hi Wei,
> > > > >>>>>
> > > > >>>>> +1 (non-binding). Thanks for driving this.
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>> Dian
> > > > >>>>>
> > > > >>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]>
> > 写道:
> > > > >>>>>>
> > > > >>>>>> +1
> > > > >>>>>>
> > > > >>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > > > >>>>>>
> > > > >>>>>>> Hi all,
> > > > >>>>>>>
> > > > >>>>>>> I would like to start the vote for FLIP-78[1] which is
> > discussed
> > > > and
> > > > >>>>>>> reached consensus in the discussion thread[2].
> > > > >>>>>>>
> > > > >>>>>>> The vote will be open for at least 72 hours. I'll try to
> close
> > it
> > > > by
> > > > >>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
> > enough
> > > > >>>> votes.
> > > > >>>>>>>
> > > > >>>>>>> Thanks,
> > > > >>>>>>> Wei
> > > > >>>>>>>
> > > > >>>>>>> [1]
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > > > >>>>>>> <
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > > > >>>>>>>>
> > > > >>>>>>> [2]
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > > >>>>>>> <
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> Best Regards
> > > > >>>
> > > > >>> Jeff Zhang
> > > > >>>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

jincheng sun
Hi Thomas,

Thanks for your explanation. I understand your original intention. I will
seriously consider this issue. After I have the initial solution, I will
bring up a further discussion in Beam ML.

Thanks for your voting. :)

Best,
Jincheng


Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:

> Hi Jincheng,
>
> Yes, this topic can be further discussed on the Beam ML. The only reason I
> brought it up here is that it would be desirable from Beam Flink runner
> perspective for the artifact staging mechanism that you work on to be
> reusable.
>
> Stage 1 in Beam is also up to the runner, artifact staging is a service
> discovered from the job server and that the Flink job server currently uses
> DFS is not set in stone. My interest was more regarding assumptions
> regarding the artifact structure, which may or may not allow for reusable
> implementation.
>
> +1 for the proposal otherwise
>
> Thomas
>
>
> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for sharing your thoughts. I think improve and solve the
> limitations
> > of the Beam artifact staging is good topic(For beam).
> >
> > As I understand it as follows:
> >
> > For Beam(data):
> >     Stage1: BeamClient ------> JobService (data will be upload to DFS).
> >     Stage2: JobService(FlinkClient) ------>  FlinkJob (operator download
> > the data from DFS)
> >     Stage3: Operator ------> Harness(artifact staging service)
> >
> > For Flink(data):
> >     Stage1: FlinkClient(data(local) upload to BlobServer using distribute
> > cache) ------> Operator (data will be download from BlobServer). Do not
> > have to depend on DFS.
> >     Stage2: Operator ------> Harness(for docker we using artifact staging
> > service)
> >
> > So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
> > distribute cache if we remove the dependency of DFS for Beam in
> Stage1.(Of
> > course we need more detail here),  we can bring up the discussion in a
> > separate Beam dev@ ML, the current discussion focuses on Flink 1.10
> > version
> > of  UDF Environment and Dependency Management for python, so I recommend
> > voting in the current ML for Flink 1.10, Beam artifact staging
> improvements
> > are discussed in a separate Beam dev@.
> >
> > What do you think?
> >
> > Best,
> > Jincheng
> >
> > Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
> >
> > > Beam artifact staging currently relies on shared file system and there
> > are
> > > limitations, for example when running locally with Docker and local FS.
> > It
> > > sounds like a distributed cache based implementation might be a good
> > > (better?) option for artifact staging even for the Beam Flink runner?
> > >
> > > If so, can the implementation you propose be compatible with the Beam
> > > artifact staging service so that it can be plugged into the Beam Flink
> > > runner?
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi Max,
> > > >
> > > > Sorry for the late reply. Regarding the issue you mentioned above,
> I'm
> > > glad
> > > > to share my thoughts:
> > > >
> > > > > For process-based execution we use Flink's cache distribution
> instead
> > > of
> > > > Beam's artifact staging.
> > > >
> > > > In current design, we use Flink's cache distribution to upload users'
> > > files
> > > > from client to cluster in both docker mode and process mode. That is,
> > > > Flink's cache distribution and Beam's artifact staging service work
> > > > together in docker mode.
> > > >
> > > >
> > > > > Do we want to implement two different ways of staging artifacts? It
> > > seems
> > > > sensible to use the same artifact staging functionality also for the
> > > > process-based execution.
> > > >
> > > > I agree that the implementation will be simple if we use the same
> > > artifact
> > > > staging functionality also for the process-based execution. However,
> > it's
> > > > not the best for performance as it will introduce an additional
> network
> > > > transmission, as in process mode TaskManager and python worker share
> > the
> > > > same environment, in which case the user files in Flink Distribute
> > Cache
> > > > can be accessed by python worker directly. We do not need the staging
> > > > service in this case.
> > > >
> > > > > Apart from being simpler, this would also allow the process-based
> > > > execution to run in other environments than the Flink TaskManager
> > > > environment.
> > > >
> > > > IMHO, this case is more like docker mode, and we can share or reuse
> the
> > > > code of Beam docker mode. Furthermore, in this case python worker is
> > > > launched by the operator, so it is always in the same environment as
> > the
> > > > operator.
> > > >
> > > > Thanks again for your feedback, and it is valuable for find out the
> > final
> > > > best architecture.
> > > >
> > > > Feel free to correct me if there is anything incorrect.
> > > >
> > > > Best,
> > > > Jincheng
> > > >
> > > > Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
> > > >
> > > > > I'm also late to the party here :) When I saw the first draft, I
> was
> > > > > thinking how exactly the design doc would tie in with Beam. Thanks
> > for
> > > > > the update.
> > > > >
> > > > > A couple of comments with this regard:
> > > > >
> > > > > > Flink has provided a distributed cache mechanism and allows users
> > to
> > > > > upload their files using "registerCachedFile" method in
> > > > > ExecutionEnvironment/StreamExecutionEnvironment. The python files
> > users
> > > > > specified through "add_python_file", "set_python_requirements" and
> > > > > "add_python_archive" are also uploaded through this method
> > eventually.
> > > > >
> > > > > For process-based execution we use Flink's cache distribution
> instead
> > > of
> > > > > Beam's artifact staging.
> > > > >
> > > > > > Apache Beam Portability Framework already supports artifact
> staging
> > > > that
> > > > > works out of the box with the Docker environment. We can use the
> > > artifact
> > > > > staging service defined in Apache Beam to transfer the dependencies
> > > from
> > > > > the operator to Python SDK harness running in the docker container.
> > > > >
> > > > > Do we want to implement two different ways of staging artifacts? It
> > > > > seems sensible to use the same artifact staging functionality also
> > for
> > > > > the process-based execution. Apart from being simpler, this would
> > also
> > > > > allow the process-based execution to run in other environments than
> > the
> > > > > Flink TaskManager environment.
> > > > >
> > > > > Thanks,
> > > > > Max
> > > > >
> > > > > On 15.10.19 11:13, Wei Zhong wrote:
> > > > > > Hi Thomas,
> > > > > >
> > > > > > Thanks a lot for your suggestion!
> > > > > >
> > > > > > As you can see from the section "Goals" that this FLIP focuses on
> > the
> > > > > dependency management in process mode. However, the APIs and design
> > > > > proposed in this FLIP also applies for the docker mode. So it makes
> > > sense
> > > > > to me to also describe how this design is integated to the artifact
> > > > staging
> > > > > service of Apache Beam in docker mode. I have updated the design
> doc
> > > and
> > > > > looking forward to your feedback.
> > > > > >
> > > > > > Thanks,
> > > > > > Wei
> > > > > >
> > > > > >> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> > > > > >>
> > > > > >> Sorry for joining the discussion late.
> > > > > >>
> > > > > >> The Beam environment already supports artifact staging, it works
> > out
> > > > of
> > > > > the
> > > > > >> box with the Docker environment. I think it would be helpful to
> > > > explain
> > > > > in
> > > > > >> the FLIP how this proposal relates to what Beam offers / how it
> > > would
> > > > be
> > > > > >> integrated.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Thomas
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
> > > wrote:
> > > > > >>
> > > > > >>> +1
> > > > > >>>
> > > > > >>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> > > > > >>>
> > > > > >>>> +1
> > > > > >>>>
> > > > > >>>> Good job, Wei!
> > > > > >>>>
> > > > > >>>> Best, Hequn
> > > > > >>>>
> > > > > >>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
> [hidden email]>
> > > > > wrote:
> > > > > >>>>
> > > > > >>>>> Hi Wei,
> > > > > >>>>>
> > > > > >>>>> +1 (non-binding). Thanks for driving this.
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Dian
> > > > > >>>>>
> > > > > >>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
> >
> > > 写道:
> > > > > >>>>>>
> > > > > >>>>>> +1
> > > > > >>>>>>
> > > > > >>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> > > > > >>>>>>
> > > > > >>>>>>> Hi all,
> > > > > >>>>>>>
> > > > > >>>>>>> I would like to start the vote for FLIP-78[1] which is
> > > discussed
> > > > > and
> > > > > >>>>>>> reached consensus in the discussion thread[2].
> > > > > >>>>>>>
> > > > > >>>>>>> The vote will be open for at least 72 hours. I'll try to
> > close
> > > it
> > > > > by
> > > > > >>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
> > > enough
> > > > > >>>> votes.
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks,
> > > > > >>>>>>> Wei
> > > > > >>>>>>>
> > > > > >>>>>>> [1]
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> > > > > >>>>>>> <
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> > > > > >>>>>>>>
> > > > > >>>>>>> [2]
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > > > >>>>>>> <
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Best Regards
> > > > > >>>
> > > > > >>> Jeff Zhang
> > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Wei Zhong-2
Hi Max,

Is there any other concerns from your side? I appreciate if you can give some feedback and vote on this.

Best,
Wei

> 在 2019年10月25日,09:33,jincheng sun <[hidden email]> 写道:
>
> Hi Thomas,
>
> Thanks for your explanation. I understand your original intention. I will
> seriously consider this issue. After I have the initial solution, I will
> bring up a further discussion in Beam ML.
>
> Thanks for your voting. :)
>
> Best,
> Jincheng
>
>
> Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:
>
>> Hi Jincheng,
>>
>> Yes, this topic can be further discussed on the Beam ML. The only reason I
>> brought it up here is that it would be desirable from Beam Flink runner
>> perspective for the artifact staging mechanism that you work on to be
>> reusable.
>>
>> Stage 1 in Beam is also up to the runner, artifact staging is a service
>> discovered from the job server and that the Flink job server currently uses
>> DFS is not set in stone. My interest was more regarding assumptions
>> regarding the artifact structure, which may or may not allow for reusable
>> implementation.
>>
>> +1 for the proposal otherwise
>>
>> Thomas
>>
>>
>> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]>
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> Thanks for sharing your thoughts. I think improve and solve the
>> limitations
>>> of the Beam artifact staging is good topic(For beam).
>>>
>>> As I understand it as follows:
>>>
>>> For Beam(data):
>>>    Stage1: BeamClient ------> JobService (data will be upload to DFS).
>>>    Stage2: JobService(FlinkClient) ------>  FlinkJob (operator download
>>> the data from DFS)
>>>    Stage3: Operator ------> Harness(artifact staging service)
>>>
>>> For Flink(data):
>>>    Stage1: FlinkClient(data(local) upload to BlobServer using distribute
>>> cache) ------> Operator (data will be download from BlobServer). Do not
>>> have to depend on DFS.
>>>    Stage2: Operator ------> Harness(for docker we using artifact staging
>>> service)
>>>
>>> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
>>> distribute cache if we remove the dependency of DFS for Beam in
>> Stage1.(Of
>>> course we need more detail here),  we can bring up the discussion in a
>>> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
>>> version
>>> of  UDF Environment and Dependency Management for python, so I recommend
>>> voting in the current ML for Flink 1.10, Beam artifact staging
>> improvements
>>> are discussed in a separate Beam dev@.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jincheng
>>>
>>> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
>>>
>>>> Beam artifact staging currently relies on shared file system and there
>>> are
>>>> limitations, for example when running locally with Docker and local FS.
>>> It
>>>> sounds like a distributed cache based implementation might be a good
>>>> (better?) option for artifact staging even for the Beam Flink runner?
>>>>
>>>> If so, can the implementation you propose be compatible with the Beam
>>>> artifact staging service so that it can be plugged into the Beam Flink
>>>> runner?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]
>>>
>>>> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>> Sorry for the late reply. Regarding the issue you mentioned above,
>> I'm
>>>> glad
>>>>> to share my thoughts:
>>>>>
>>>>>> For process-based execution we use Flink's cache distribution
>> instead
>>>> of
>>>>> Beam's artifact staging.
>>>>>
>>>>> In current design, we use Flink's cache distribution to upload users'
>>>> files
>>>>> from client to cluster in both docker mode and process mode. That is,
>>>>> Flink's cache distribution and Beam's artifact staging service work
>>>>> together in docker mode.
>>>>>
>>>>>
>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>> seems
>>>>> sensible to use the same artifact staging functionality also for the
>>>>> process-based execution.
>>>>>
>>>>> I agree that the implementation will be simple if we use the same
>>>> artifact
>>>>> staging functionality also for the process-based execution. However,
>>> it's
>>>>> not the best for performance as it will introduce an additional
>> network
>>>>> transmission, as in process mode TaskManager and python worker share
>>> the
>>>>> same environment, in which case the user files in Flink Distribute
>>> Cache
>>>>> can be accessed by python worker directly. We do not need the staging
>>>>> service in this case.
>>>>>
>>>>>> Apart from being simpler, this would also allow the process-based
>>>>> execution to run in other environments than the Flink TaskManager
>>>>> environment.
>>>>>
>>>>> IMHO, this case is more like docker mode, and we can share or reuse
>> the
>>>>> code of Beam docker mode. Furthermore, in this case python worker is
>>>>> launched by the operator, so it is always in the same environment as
>>> the
>>>>> operator.
>>>>>
>>>>> Thanks again for your feedback, and it is valuable for find out the
>>> final
>>>>> best architecture.
>>>>>
>>>>> Feel free to correct me if there is anything incorrect.
>>>>>
>>>>> Best,
>>>>> Jincheng
>>>>>
>>>>> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
>>>>>
>>>>>> I'm also late to the party here :) When I saw the first draft, I
>> was
>>>>>> thinking how exactly the design doc would tie in with Beam. Thanks
>>> for
>>>>>> the update.
>>>>>>
>>>>>> A couple of comments with this regard:
>>>>>>
>>>>>>> Flink has provided a distributed cache mechanism and allows users
>>> to
>>>>>> upload their files using "registerCachedFile" method in
>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. The python files
>>> users
>>>>>> specified through "add_python_file", "set_python_requirements" and
>>>>>> "add_python_archive" are also uploaded through this method
>>> eventually.
>>>>>>
>>>>>> For process-based execution we use Flink's cache distribution
>> instead
>>>> of
>>>>>> Beam's artifact staging.
>>>>>>
>>>>>>> Apache Beam Portability Framework already supports artifact
>> staging
>>>>> that
>>>>>> works out of the box with the Docker environment. We can use the
>>>> artifact
>>>>>> staging service defined in Apache Beam to transfer the dependencies
>>>> from
>>>>>> the operator to Python SDK harness running in the docker container.
>>>>>>
>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>> seems sensible to use the same artifact staging functionality also
>>> for
>>>>>> the process-based execution. Apart from being simpler, this would
>>> also
>>>>>> allow the process-based execution to run in other environments than
>>> the
>>>>>> Flink TaskManager environment.
>>>>>>
>>>>>> Thanks,
>>>>>> Max
>>>>>>
>>>>>> On 15.10.19 11:13, Wei Zhong wrote:
>>>>>>> Hi Thomas,
>>>>>>>
>>>>>>> Thanks a lot for your suggestion!
>>>>>>>
>>>>>>> As you can see from the section "Goals" that this FLIP focuses on
>>> the
>>>>>> dependency management in process mode. However, the APIs and design
>>>>>> proposed in this FLIP also applies for the docker mode. So it makes
>>>> sense
>>>>>> to me to also describe how this design is integated to the artifact
>>>>> staging
>>>>>> service of Apache Beam in docker mode. I have updated the design
>> doc
>>>> and
>>>>>> looking forward to your feedback.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Wei
>>>>>>>
>>>>>>>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>>>>>>>>
>>>>>>>> Sorry for joining the discussion late.
>>>>>>>>
>>>>>>>> The Beam environment already supports artifact staging, it works
>>> out
>>>>> of
>>>>>> the
>>>>>>>> box with the Docker environment. I think it would be helpful to
>>>>> explain
>>>>>> in
>>>>>>>> the FLIP how this proposal relates to what Beam offers / how it
>>>> would
>>>>> be
>>>>>>>> integrated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
>>>> wrote:
>>>>>>>>
>>>>>>>>> +1
>>>>>>>>>
>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>>>>>>>>
>>>>>>>>>> +1
>>>>>>>>>>
>>>>>>>>>> Good job, Wei!
>>>>>>>>>>
>>>>>>>>>> Best, Hequn
>>>>>>>>>>
>>>>>>>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
>> [hidden email]>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Wei,
>>>>>>>>>>>
>>>>>>>>>>> +1 (non-binding). Thanks for driving this.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dian
>>>>>>>>>>>
>>>>>>>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
>>>
>>>> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> +1
>>>>>>>>>>>>
>>>>>>>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would like to start the vote for FLIP-78[1] which is
>>>> discussed
>>>>>> and
>>>>>>>>>>>>> reached consensus in the discussion thread[2].
>>>>>>>>>>>>>
>>>>>>>>>>>>> The vote will be open for at least 72 hours. I'll try to
>>> close
>>>> it
>>>>>> by
>>>>>>>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
>>>> enough
>>>>>>>>>> votes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Wei
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>> <
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>> <
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

mxm
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

mxm
Hi Wei, hi Jincheng,

+1 on the current approach.

I agree it would be nice to allow for the Beam artifact staging to use
Flink's BlobServer. However, the current implementation which uses the
distributed file system is more generic, since the BlobServer is only
available on the TaskManager and not necessarily inside Harness
containers (Stage 3).

So for stage 1 (Client <=> JobServer) we could certainly use the
BlobServer. Stage 2 (Flink job submission) already uses it, and Stage 3
(container setup) probably has to have some form of distributed file
system or directory which has been populated with the dependencies.

Thanks,
Max

On 25.10.19 03:45, Wei Zhong wrote:

> Hi Max,
>
> Is there any other concerns from your side? I appreciate if you can give some feedback and vote on this.
>
> Best,
> Wei
>
>> 在 2019年10月25日,09:33,jincheng sun <[hidden email]> 写道:
>>
>> Hi Thomas,
>>
>> Thanks for your explanation. I understand your original intention. I will
>> seriously consider this issue. After I have the initial solution, I will
>> bring up a further discussion in Beam ML.
>>
>> Thanks for your voting. :)
>>
>> Best,
>> Jincheng
>>
>>
>> Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:
>>
>>> Hi Jincheng,
>>>
>>> Yes, this topic can be further discussed on the Beam ML. The only reason I
>>> brought it up here is that it would be desirable from Beam Flink runner
>>> perspective for the artifact staging mechanism that you work on to be
>>> reusable.
>>>
>>> Stage 1 in Beam is also up to the runner, artifact staging is a service
>>> discovered from the job server and that the Flink job server currently uses
>>> DFS is not set in stone. My interest was more regarding assumptions
>>> regarding the artifact structure, which may or may not allow for reusable
>>> implementation.
>>>
>>> +1 for the proposal otherwise
>>>
>>> Thomas
>>>
>>>
>>> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]>
>>> wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> Thanks for sharing your thoughts. I think improve and solve the
>>> limitations
>>>> of the Beam artifact staging is good topic(For beam).
>>>>
>>>> As I understand it as follows:
>>>>
>>>> For Beam(data):
>>>>     Stage1: BeamClient ------> JobService (data will be upload to DFS).
>>>>     Stage2: JobService(FlinkClient) ------>  FlinkJob (operator download
>>>> the data from DFS)
>>>>     Stage3: Operator ------> Harness(artifact staging service)
>>>>
>>>> For Flink(data):
>>>>     Stage1: FlinkClient(data(local) upload to BlobServer using distribute
>>>> cache) ------> Operator (data will be download from BlobServer). Do not
>>>> have to depend on DFS.
>>>>     Stage2: Operator ------> Harness(for docker we using artifact staging
>>>> service)
>>>>
>>>> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
>>>> distribute cache if we remove the dependency of DFS for Beam in
>>> Stage1.(Of
>>>> course we need more detail here),  we can bring up the discussion in a
>>>> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
>>>> version
>>>> of  UDF Environment and Dependency Management for python, so I recommend
>>>> voting in the current ML for Flink 1.10, Beam artifact staging
>>> improvements
>>>> are discussed in a separate Beam dev@.
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Jincheng
>>>>
>>>> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
>>>>
>>>>> Beam artifact staging currently relies on shared file system and there
>>>> are
>>>>> limitations, for example when running locally with Docker and local FS.
>>>> It
>>>>> sounds like a distributed cache based implementation might be a good
>>>>> (better?) option for artifact staging even for the Beam Flink runner?
>>>>>
>>>>> If so, can the implementation you propose be compatible with the Beam
>>>>> artifact staging service so that it can be plugged into the Beam Flink
>>>>> runner?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <[hidden email]
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Max,
>>>>>>
>>>>>> Sorry for the late reply. Regarding the issue you mentioned above,
>>> I'm
>>>>> glad
>>>>>> to share my thoughts:
>>>>>>
>>>>>>> For process-based execution we use Flink's cache distribution
>>> instead
>>>>> of
>>>>>> Beam's artifact staging.
>>>>>>
>>>>>> In current design, we use Flink's cache distribution to upload users'
>>>>> files
>>>>>> from client to cluster in both docker mode and process mode. That is,
>>>>>> Flink's cache distribution and Beam's artifact staging service work
>>>>>> together in docker mode.
>>>>>>
>>>>>>
>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>> seems
>>>>>> sensible to use the same artifact staging functionality also for the
>>>>>> process-based execution.
>>>>>>
>>>>>> I agree that the implementation will be simple if we use the same
>>>>> artifact
>>>>>> staging functionality also for the process-based execution. However,
>>>> it's
>>>>>> not the best for performance as it will introduce an additional
>>> network
>>>>>> transmission, as in process mode TaskManager and python worker share
>>>> the
>>>>>> same environment, in which case the user files in Flink Distribute
>>>> Cache
>>>>>> can be accessed by python worker directly. We do not need the staging
>>>>>> service in this case.
>>>>>>
>>>>>>> Apart from being simpler, this would also allow the process-based
>>>>>> execution to run in other environments than the Flink TaskManager
>>>>>> environment.
>>>>>>
>>>>>> IMHO, this case is more like docker mode, and we can share or reuse
>>> the
>>>>>> code of Beam docker mode. Furthermore, in this case python worker is
>>>>>> launched by the operator, so it is always in the same environment as
>>>> the
>>>>>> operator.
>>>>>>
>>>>>> Thanks again for your feedback, and it is valuable for find out the
>>>> final
>>>>>> best architecture.
>>>>>>
>>>>>> Feel free to correct me if there is anything incorrect.
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
>>>>>>
>>>>>>> I'm also late to the party here :) When I saw the first draft, I
>>> was
>>>>>>> thinking how exactly the design doc would tie in with Beam. Thanks
>>>> for
>>>>>>> the update.
>>>>>>>
>>>>>>> A couple of comments with this regard:
>>>>>>>
>>>>>>>> Flink has provided a distributed cache mechanism and allows users
>>>> to
>>>>>>> upload their files using "registerCachedFile" method in
>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. The python files
>>>> users
>>>>>>> specified through "add_python_file", "set_python_requirements" and
>>>>>>> "add_python_archive" are also uploaded through this method
>>>> eventually.
>>>>>>>
>>>>>>> For process-based execution we use Flink's cache distribution
>>> instead
>>>>> of
>>>>>>> Beam's artifact staging.
>>>>>>>
>>>>>>>> Apache Beam Portability Framework already supports artifact
>>> staging
>>>>>> that
>>>>>>> works out of the box with the Docker environment. We can use the
>>>>> artifact
>>>>>>> staging service defined in Apache Beam to transfer the dependencies
>>>>> from
>>>>>>> the operator to Python SDK harness running in the docker container.
>>>>>>>
>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>>> seems sensible to use the same artifact staging functionality also
>>>> for
>>>>>>> the process-based execution. Apart from being simpler, this would
>>>> also
>>>>>>> allow the process-based execution to run in other environments than
>>>> the
>>>>>>> Flink TaskManager environment.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Max
>>>>>>>
>>>>>>> On 15.10.19 11:13, Wei Zhong wrote:
>>>>>>>> Hi Thomas,
>>>>>>>>
>>>>>>>> Thanks a lot for your suggestion!
>>>>>>>>
>>>>>>>> As you can see from the section "Goals" that this FLIP focuses on
>>>> the
>>>>>>> dependency management in process mode. However, the APIs and design
>>>>>>> proposed in this FLIP also applies for the docker mode. So it makes
>>>>> sense
>>>>>>> to me to also describe how this design is integated to the artifact
>>>>>> staging
>>>>>>> service of Apache Beam in docker mode. I have updated the design
>>> doc
>>>>> and
>>>>>>> looking forward to your feedback.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Wei
>>>>>>>>
>>>>>>>>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>
>>>>>>>>> Sorry for joining the discussion late.
>>>>>>>>>
>>>>>>>>> The Beam environment already supports artifact staging, it works
>>>> out
>>>>>> of
>>>>>>> the
>>>>>>>>> box with the Docker environment. I think it would be helpful to
>>>>>> explain
>>>>>>> in
>>>>>>>>> the FLIP how this proposal relates to what Beam offers / how it
>>>>> would
>>>>>> be
>>>>>>>>> integrated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1
>>>>>>>>>>
>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>>>>>>>>>
>>>>>>>>>>> +1
>>>>>>>>>>>
>>>>>>>>>>> Good job, Wei!
>>>>>>>>>>>
>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
>>> [hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Wei,
>>>>>>>>>>>>
>>>>>>>>>>>> +1 (non-binding). Thanks for driving this.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
>>>>
>>>>> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1
>>>>>>>>>>>>>
>>>>>>>>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would like to start the vote for FLIP-78[1] which is
>>>>> discussed
>>>>>>> and
>>>>>>>>>>>>>> reached consensus in the discussion thread[2].
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The vote will be open for at least 72 hours. I'll try to
>>>> close
>>>>> it
>>>>>>> by
>>>>>>>>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
>>>>> enough
>>>>>>>>>>> votes.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Wei
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

jincheng sun
Hi Max,

Thanks for your feedback. You are right, we really need a more generic
solution,  I volunteer to draft an init solution design doc, and bring up
the discussion in Beam @dev ASAP. (Maybe after release of Flink 1.10).

Thank you for the voting.

Best,
Jincheng

Maximilian Michels <[hidden email]> 于2019年10月26日周六 上午1:05写道:

> Hi Wei, hi Jincheng,
>
> +1 on the current approach.
>
> I agree it would be nice to allow for the Beam artifact staging to use
> Flink's BlobServer. However, the current implementation which uses the
> distributed file system is more generic, since the BlobServer is only
> available on the TaskManager and not necessarily inside Harness
> containers (Stage 3).
>
> So for stage 1 (Client <=> JobServer) we could certainly use the
> BlobServer. Stage 2 (Flink job submission) already uses it, and Stage 3
> (container setup) probably has to have some form of distributed file
> system or directory which has been populated with the dependencies.
>
> Thanks,
> Max
>
> On 25.10.19 03:45, Wei Zhong wrote:
> > Hi Max,
> >
> > Is there any other concerns from your side? I appreciate if you can give
> some feedback and vote on this.
> >
> > Best,
> > Wei
> >
> >> 在 2019年10月25日,09:33,jincheng sun <[hidden email]> 写道:
> >>
> >> Hi Thomas,
> >>
> >> Thanks for your explanation. I understand your original intention. I
> will
> >> seriously consider this issue. After I have the initial solution, I will
> >> bring up a further discussion in Beam ML.
> >>
> >> Thanks for your voting. :)
> >>
> >> Best,
> >> Jincheng
> >>
> >>
> >> Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> Yes, this topic can be further discussed on the Beam ML. The only
> reason I
> >>> brought it up here is that it would be desirable from Beam Flink runner
> >>> perspective for the artifact staging mechanism that you work on to be
> >>> reusable.
> >>>
> >>> Stage 1 in Beam is also up to the runner, artifact staging is a service
> >>> discovered from the job server and that the Flink job server currently
> uses
> >>> DFS is not set in stone. My interest was more regarding assumptions
> >>> regarding the artifact structure, which may or may not allow for
> reusable
> >>> implementation.
> >>>
> >>> +1 for the proposal otherwise
> >>>
> >>> Thomas
> >>>
> >>>
> >>> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]
> >
> >>> wrote:
> >>>
> >>>> Hi Thomas,
> >>>>
> >>>> Thanks for sharing your thoughts. I think improve and solve the
> >>> limitations
> >>>> of the Beam artifact staging is good topic(For beam).
> >>>>
> >>>> As I understand it as follows:
> >>>>
> >>>> For Beam(data):
> >>>>     Stage1: BeamClient ------> JobService (data will be upload to
> DFS).
> >>>>     Stage2: JobService(FlinkClient) ------>  FlinkJob (operator
> download
> >>>> the data from DFS)
> >>>>     Stage3: Operator ------> Harness(artifact staging service)
> >>>>
> >>>> For Flink(data):
> >>>>     Stage1: FlinkClient(data(local) upload to BlobServer using
> distribute
> >>>> cache) ------> Operator (data will be download from BlobServer). Do
> not
> >>>> have to depend on DFS.
> >>>>     Stage2: Operator ------> Harness(for docker we using artifact
> staging
> >>>> service)
> >>>>
> >>>> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
> >>>> distribute cache if we remove the dependency of DFS for Beam in
> >>> Stage1.(Of
> >>>> course we need more detail here),  we can bring up the discussion in a
> >>>> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
> >>>> version
> >>>> of  UDF Environment and Dependency Management for python, so I
> recommend
> >>>> voting in the current ML for Flink 1.10, Beam artifact staging
> >>> improvements
> >>>> are discussed in a separate Beam dev@.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Best,
> >>>> Jincheng
> >>>>
> >>>> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
> >>>>
> >>>>> Beam artifact staging currently relies on shared file system and
> there
> >>>> are
> >>>>> limitations, for example when running locally with Docker and local
> FS.
> >>>> It
> >>>>> sounds like a distributed cache based implementation might be a good
> >>>>> (better?) option for artifact staging even for the Beam Flink runner?
> >>>>>
> >>>>> If so, can the implementation you propose be compatible with the Beam
> >>>>> artifact staging service so that it can be plugged into the Beam
> Flink
> >>>>> runner?
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <
> [hidden email]
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Max,
> >>>>>>
> >>>>>> Sorry for the late reply. Regarding the issue you mentioned above,
> >>> I'm
> >>>>> glad
> >>>>>> to share my thoughts:
> >>>>>>
> >>>>>>> For process-based execution we use Flink's cache distribution
> >>> instead
> >>>>> of
> >>>>>> Beam's artifact staging.
> >>>>>>
> >>>>>> In current design, we use Flink's cache distribution to upload
> users'
> >>>>> files
> >>>>>> from client to cluster in both docker mode and process mode. That
> is,
> >>>>>> Flink's cache distribution and Beam's artifact staging service work
> >>>>>> together in docker mode.
> >>>>>>
> >>>>>>
> >>>>>>> Do we want to implement two different ways of staging artifacts? It
> >>>>> seems
> >>>>>> sensible to use the same artifact staging functionality also for the
> >>>>>> process-based execution.
> >>>>>>
> >>>>>> I agree that the implementation will be simple if we use the same
> >>>>> artifact
> >>>>>> staging functionality also for the process-based execution. However,
> >>>> it's
> >>>>>> not the best for performance as it will introduce an additional
> >>> network
> >>>>>> transmission, as in process mode TaskManager and python worker share
> >>>> the
> >>>>>> same environment, in which case the user files in Flink Distribute
> >>>> Cache
> >>>>>> can be accessed by python worker directly. We do not need the
> staging
> >>>>>> service in this case.
> >>>>>>
> >>>>>>> Apart from being simpler, this would also allow the process-based
> >>>>>> execution to run in other environments than the Flink TaskManager
> >>>>>> environment.
> >>>>>>
> >>>>>> IMHO, this case is more like docker mode, and we can share or reuse
> >>> the
> >>>>>> code of Beam docker mode. Furthermore, in this case python worker is
> >>>>>> launched by the operator, so it is always in the same environment as
> >>>> the
> >>>>>> operator.
> >>>>>>
> >>>>>> Thanks again for your feedback, and it is valuable for find out the
> >>>> final
> >>>>>> best architecture.
> >>>>>>
> >>>>>> Feel free to correct me if there is anything incorrect.
> >>>>>>
> >>>>>> Best,
> >>>>>> Jincheng
> >>>>>>
> >>>>>> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
> >>>>>>
> >>>>>>> I'm also late to the party here :) When I saw the first draft, I
> >>> was
> >>>>>>> thinking how exactly the design doc would tie in with Beam. Thanks
> >>>> for
> >>>>>>> the update.
> >>>>>>>
> >>>>>>> A couple of comments with this regard:
> >>>>>>>
> >>>>>>>> Flink has provided a distributed cache mechanism and allows users
> >>>> to
> >>>>>>> upload their files using "registerCachedFile" method in
> >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. The python files
> >>>> users
> >>>>>>> specified through "add_python_file", "set_python_requirements" and
> >>>>>>> "add_python_archive" are also uploaded through this method
> >>>> eventually.
> >>>>>>>
> >>>>>>> For process-based execution we use Flink's cache distribution
> >>> instead
> >>>>> of
> >>>>>>> Beam's artifact staging.
> >>>>>>>
> >>>>>>>> Apache Beam Portability Framework already supports artifact
> >>> staging
> >>>>>> that
> >>>>>>> works out of the box with the Docker environment. We can use the
> >>>>> artifact
> >>>>>>> staging service defined in Apache Beam to transfer the dependencies
> >>>>> from
> >>>>>>> the operator to Python SDK harness running in the docker container.
> >>>>>>>
> >>>>>>> Do we want to implement two different ways of staging artifacts? It
> >>>>>>> seems sensible to use the same artifact staging functionality also
> >>>> for
> >>>>>>> the process-based execution. Apart from being simpler, this would
> >>>> also
> >>>>>>> allow the process-based execution to run in other environments than
> >>>> the
> >>>>>>> Flink TaskManager environment.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Max
> >>>>>>>
> >>>>>>> On 15.10.19 11:13, Wei Zhong wrote:
> >>>>>>>> Hi Thomas,
> >>>>>>>>
> >>>>>>>> Thanks a lot for your suggestion!
> >>>>>>>>
> >>>>>>>> As you can see from the section "Goals" that this FLIP focuses on
> >>>> the
> >>>>>>> dependency management in process mode. However, the APIs and design
> >>>>>>> proposed in this FLIP also applies for the docker mode. So it makes
> >>>>> sense
> >>>>>>> to me to also describe how this design is integated to the artifact
> >>>>>> staging
> >>>>>>> service of Apache Beam in docker mode. I have updated the design
> >>> doc
> >>>>> and
> >>>>>>> looking forward to your feedback.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Wei
> >>>>>>>>
> >>>>>>>>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
> >>>>>>>>>
> >>>>>>>>> Sorry for joining the discussion late.
> >>>>>>>>>
> >>>>>>>>> The Beam environment already supports artifact staging, it works
> >>>> out
> >>>>>> of
> >>>>>>> the
> >>>>>>>>> box with the Docker environment. I think it would be helpful to
> >>>>>> explain
> >>>>>>> in
> >>>>>>>>> the FLIP how this proposal relates to what Beam offers / how it
> >>>>> would
> >>>>>> be
> >>>>>>>>> integrated.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Thomas
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> +1
> >>>>>>>>>>
> >>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
> >>>>>>>>>>
> >>>>>>>>>>> +1
> >>>>>>>>>>>
> >>>>>>>>>>> Good job, Wei!
> >>>>>>>>>>>
> >>>>>>>>>>> Best, Hequn
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
> >>> [hidden email]>
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Wei,
> >>>>>>>>>>>>
> >>>>>>>>>>>> +1 (non-binding). Thanks for driving this.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Dian
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
> >>>>
> >>>>> 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> +1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would like to start the vote for FLIP-78[1] which is
> >>>>> discussed
> >>>>>>> and
> >>>>>>>>>>>>>> reached consensus in the discussion thread[2].
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The vote will be open for at least 72 hours. I'll try to
> >>>> close
> >>>>> it
> >>>>>>> by
> >>>>>>>>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
> >>>>> enough
> >>>>>>>>>>> votes.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Wei
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
> >>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Best Regards
> >>>>>>>>>>
> >>>>>>>>>> Jeff Zhang
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Wei Zhong-2
Thanks everyone for the votes!
I’ll summarize the voting result in a separate email.

Best,
Wei

> 在 2019年10月28日,11:38,jincheng sun <[hidden email]> 写道:
>
> Hi Max,
>
> Thanks for your feedback. You are right, we really need a more generic
> solution,  I volunteer to draft an init solution design doc, and bring up
> the discussion in Beam @dev ASAP. (Maybe after release of Flink 1.10).
>
> Thank you for the voting.
>
> Best,
> Jincheng
>
> Maximilian Michels <[hidden email]> 于2019年10月26日周六 上午1:05写道:
>
>> Hi Wei, hi Jincheng,
>>
>> +1 on the current approach.
>>
>> I agree it would be nice to allow for the Beam artifact staging to use
>> Flink's BlobServer. However, the current implementation which uses the
>> distributed file system is more generic, since the BlobServer is only
>> available on the TaskManager and not necessarily inside Harness
>> containers (Stage 3).
>>
>> So for stage 1 (Client <=> JobServer) we could certainly use the
>> BlobServer. Stage 2 (Flink job submission) already uses it, and Stage 3
>> (container setup) probably has to have some form of distributed file
>> system or directory which has been populated with the dependencies.
>>
>> Thanks,
>> Max
>>
>> On 25.10.19 03:45, Wei Zhong wrote:
>>> Hi Max,
>>>
>>> Is there any other concerns from your side? I appreciate if you can give
>> some feedback and vote on this.
>>>
>>> Best,
>>> Wei
>>>
>>>> 在 2019年10月25日,09:33,jincheng sun <[hidden email]> 写道:
>>>>
>>>> Hi Thomas,
>>>>
>>>> Thanks for your explanation. I understand your original intention. I
>> will
>>>> seriously consider this issue. After I have the initial solution, I will
>>>> bring up a further discussion in Beam ML.
>>>>
>>>> Thanks for your voting. :)
>>>>
>>>> Best,
>>>> Jincheng
>>>>
>>>>
>>>> Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:
>>>>
>>>>> Hi Jincheng,
>>>>>
>>>>> Yes, this topic can be further discussed on the Beam ML. The only
>> reason I
>>>>> brought it up here is that it would be desirable from Beam Flink runner
>>>>> perspective for the artifact staging mechanism that you work on to be
>>>>> reusable.
>>>>>
>>>>> Stage 1 in Beam is also up to the runner, artifact staging is a service
>>>>> discovered from the job server and that the Flink job server currently
>> uses
>>>>> DFS is not set in stone. My interest was more regarding assumptions
>>>>> regarding the artifact structure, which may or may not allow for
>> reusable
>>>>> implementation.
>>>>>
>>>>> +1 for the proposal otherwise
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]
>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> Thanks for sharing your thoughts. I think improve and solve the
>>>>> limitations
>>>>>> of the Beam artifact staging is good topic(For beam).
>>>>>>
>>>>>> As I understand it as follows:
>>>>>>
>>>>>> For Beam(data):
>>>>>>    Stage1: BeamClient ------> JobService (data will be upload to
>> DFS).
>>>>>>    Stage2: JobService(FlinkClient) ------>  FlinkJob (operator
>> download
>>>>>> the data from DFS)
>>>>>>    Stage3: Operator ------> Harness(artifact staging service)
>>>>>>
>>>>>> For Flink(data):
>>>>>>    Stage1: FlinkClient(data(local) upload to BlobServer using
>> distribute
>>>>>> cache) ------> Operator (data will be download from BlobServer). Do
>> not
>>>>>> have to depend on DFS.
>>>>>>    Stage2: Operator ------> Harness(for docker we using artifact
>> staging
>>>>>> service)
>>>>>>
>>>>>> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
>>>>>> distribute cache if we remove the dependency of DFS for Beam in
>>>>> Stage1.(Of
>>>>>> course we need more detail here),  we can bring up the discussion in a
>>>>>> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
>>>>>> version
>>>>>> of  UDF Environment and Dependency Management for python, so I
>> recommend
>>>>>> voting in the current ML for Flink 1.10, Beam artifact staging
>>>>> improvements
>>>>>> are discussed in a separate Beam dev@.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
>>>>>>
>>>>>>> Beam artifact staging currently relies on shared file system and
>> there
>>>>>> are
>>>>>>> limitations, for example when running locally with Docker and local
>> FS.
>>>>>> It
>>>>>>> sounds like a distributed cache based implementation might be a good
>>>>>>> (better?) option for artifact staging even for the Beam Flink runner?
>>>>>>>
>>>>>>> If so, can the implementation you propose be compatible with the Beam
>>>>>>> artifact staging service so that it can be plugged into the Beam
>> Flink
>>>>>>> runner?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <
>> [hidden email]
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Max,
>>>>>>>>
>>>>>>>> Sorry for the late reply. Regarding the issue you mentioned above,
>>>>> I'm
>>>>>>> glad
>>>>>>>> to share my thoughts:
>>>>>>>>
>>>>>>>>> For process-based execution we use Flink's cache distribution
>>>>> instead
>>>>>>> of
>>>>>>>> Beam's artifact staging.
>>>>>>>>
>>>>>>>> In current design, we use Flink's cache distribution to upload
>> users'
>>>>>>> files
>>>>>>>> from client to cluster in both docker mode and process mode. That
>> is,
>>>>>>>> Flink's cache distribution and Beam's artifact staging service work
>>>>>>>> together in docker mode.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>>> seems
>>>>>>>> sensible to use the same artifact staging functionality also for the
>>>>>>>> process-based execution.
>>>>>>>>
>>>>>>>> I agree that the implementation will be simple if we use the same
>>>>>>> artifact
>>>>>>>> staging functionality also for the process-based execution. However,
>>>>>> it's
>>>>>>>> not the best for performance as it will introduce an additional
>>>>> network
>>>>>>>> transmission, as in process mode TaskManager and python worker share
>>>>>> the
>>>>>>>> same environment, in which case the user files in Flink Distribute
>>>>>> Cache
>>>>>>>> can be accessed by python worker directly. We do not need the
>> staging
>>>>>>>> service in this case.
>>>>>>>>
>>>>>>>>> Apart from being simpler, this would also allow the process-based
>>>>>>>> execution to run in other environments than the Flink TaskManager
>>>>>>>> environment.
>>>>>>>>
>>>>>>>> IMHO, this case is more like docker mode, and we can share or reuse
>>>>> the
>>>>>>>> code of Beam docker mode. Furthermore, in this case python worker is
>>>>>>>> launched by the operator, so it is always in the same environment as
>>>>>> the
>>>>>>>> operator.
>>>>>>>>
>>>>>>>> Thanks again for your feedback, and it is valuable for find out the
>>>>>> final
>>>>>>>> best architecture.
>>>>>>>>
>>>>>>>> Feel free to correct me if there is anything incorrect.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jincheng
>>>>>>>>
>>>>>>>> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
>>>>>>>>
>>>>>>>>> I'm also late to the party here :) When I saw the first draft, I
>>>>> was
>>>>>>>>> thinking how exactly the design doc would tie in with Beam. Thanks
>>>>>> for
>>>>>>>>> the update.
>>>>>>>>>
>>>>>>>>> A couple of comments with this regard:
>>>>>>>>>
>>>>>>>>>> Flink has provided a distributed cache mechanism and allows users
>>>>>> to
>>>>>>>>> upload their files using "registerCachedFile" method in
>>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. The python files
>>>>>> users
>>>>>>>>> specified through "add_python_file", "set_python_requirements" and
>>>>>>>>> "add_python_archive" are also uploaded through this method
>>>>>> eventually.
>>>>>>>>>
>>>>>>>>> For process-based execution we use Flink's cache distribution
>>>>> instead
>>>>>>> of
>>>>>>>>> Beam's artifact staging.
>>>>>>>>>
>>>>>>>>>> Apache Beam Portability Framework already supports artifact
>>>>> staging
>>>>>>>> that
>>>>>>>>> works out of the box with the Docker environment. We can use the
>>>>>>> artifact
>>>>>>>>> staging service defined in Apache Beam to transfer the dependencies
>>>>>>> from
>>>>>>>>> the operator to Python SDK harness running in the docker container.
>>>>>>>>>
>>>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>>>>> seems sensible to use the same artifact staging functionality also
>>>>>> for
>>>>>>>>> the process-based execution. Apart from being simpler, this would
>>>>>> also
>>>>>>>>> allow the process-based execution to run in other environments than
>>>>>> the
>>>>>>>>> Flink TaskManager environment.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Max
>>>>>>>>>
>>>>>>>>> On 15.10.19 11:13, Wei Zhong wrote:
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> Thanks a lot for your suggestion!
>>>>>>>>>>
>>>>>>>>>> As you can see from the section "Goals" that this FLIP focuses on
>>>>>> the
>>>>>>>>> dependency management in process mode. However, the APIs and design
>>>>>>>>> proposed in this FLIP also applies for the docker mode. So it makes
>>>>>>> sense
>>>>>>>>> to me to also describe how this design is integated to the artifact
>>>>>>>> staging
>>>>>>>>> service of Apache Beam in docker mode. I have updated the design
>>>>> doc
>>>>>>> and
>>>>>>>>> looking forward to your feedback.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Wei
>>>>>>>>>>
>>>>>>>>>>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Sorry for joining the discussion late.
>>>>>>>>>>>
>>>>>>>>>>> The Beam environment already supports artifact staging, it works
>>>>>> out
>>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>> box with the Docker environment. I think it would be helpful to
>>>>>>>> explain
>>>>>>>>> in
>>>>>>>>>>> the FLIP how this proposal relates to what Beam offers / how it
>>>>>>> would
>>>>>>>> be
>>>>>>>>>>> integrated.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> +1
>>>>>>>>>>>>
>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1
>>>>>>>>>>>>>
>>>>>>>>>>>>> Good job, Wei!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
>>>>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Wei,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 (non-binding). Thanks for driving this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
>>>>>>
>>>>>>> 写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to start the vote for FLIP-78[1] which is
>>>>>>> discussed
>>>>>>>>> and
>>>>>>>>>>>>>>>> reached consensus in the discussion thread[2].
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The vote will be open for at least 72 hours. I'll try to
>>>>>> close
>>>>>>> it
>>>>>>>>> by
>>>>>>>>>>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
>>>>>>> enough
>>>>>>>>>>>>> votes.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Wei
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

[RESULT][VOTE] FLIP-78: Flink Python UDF Environment and Dependency Management

Wei Zhong-2
Hi all,

The voting time for FLIP-78 has passed. I'm closing the vote now.

There were 6 +1 votes, 4 of which are binding:
- Jincheng (binding)
- Hequn (binding)
- Thomas (binding)
- Maximilian (binding)
- Dian (non-binding)
- Jeff (non-binding)

There were no disapproving votes.

Thus, FLIP-78 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Wei

> 在 2019年10月28日,15:06,Wei Zhong <[hidden email]> 写道:
>
> Thanks everyone for the votes!
> I’ll summarize the voting result in a separate email.
>
> Best,
> Wei
>
>> 在 2019年10月28日,11:38,jincheng sun <[hidden email]> 写道:
>>
>> Hi Max,
>>
>> Thanks for your feedback. You are right, we really need a more generic
>> solution,  I volunteer to draft an init solution design doc, and bring up
>> the discussion in Beam @dev ASAP. (Maybe after release of Flink 1.10).
>>
>> Thank you for the voting.
>>
>> Best,
>> Jincheng
>>
>> Maximilian Michels <[hidden email]> 于2019年10月26日周六 上午1:05写道:
>>
>>> Hi Wei, hi Jincheng,
>>>
>>> +1 on the current approach.
>>>
>>> I agree it would be nice to allow for the Beam artifact staging to use
>>> Flink's BlobServer. However, the current implementation which uses the
>>> distributed file system is more generic, since the BlobServer is only
>>> available on the TaskManager and not necessarily inside Harness
>>> containers (Stage 3).
>>>
>>> So for stage 1 (Client <=> JobServer) we could certainly use the
>>> BlobServer. Stage 2 (Flink job submission) already uses it, and Stage 3
>>> (container setup) probably has to have some form of distributed file
>>> system or directory which has been populated with the dependencies.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 25.10.19 03:45, Wei Zhong wrote:
>>>> Hi Max,
>>>>
>>>> Is there any other concerns from your side? I appreciate if you can give
>>> some feedback and vote on this.
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>>> 在 2019年10月25日,09:33,jincheng sun <[hidden email]> 写道:
>>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> Thanks for your explanation. I understand your original intention. I
>>> will
>>>>> seriously consider this issue. After I have the initial solution, I will
>>>>> bring up a further discussion in Beam ML.
>>>>>
>>>>> Thanks for your voting. :)
>>>>>
>>>>> Best,
>>>>> Jincheng
>>>>>
>>>>>
>>>>> Thomas Weise <[hidden email]> 于2019年10月25日周五 上午7:32写道:
>>>>>
>>>>>> Hi Jincheng,
>>>>>>
>>>>>> Yes, this topic can be further discussed on the Beam ML. The only
>>> reason I
>>>>>> brought it up here is that it would be desirable from Beam Flink runner
>>>>>> perspective for the artifact staging mechanism that you work on to be
>>>>>> reusable.
>>>>>>
>>>>>> Stage 1 in Beam is also up to the runner, artifact staging is a service
>>>>>> discovered from the job server and that the Flink job server currently
>>> uses
>>>>>> DFS is not set in stone. My interest was more regarding assumptions
>>>>>> regarding the artifact structure, which may or may not allow for
>>> reusable
>>>>>> implementation.
>>>>>>
>>>>>> +1 for the proposal otherwise
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 21, 2019 at 8:40 PM jincheng sun <[hidden email]
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Thomas,
>>>>>>>
>>>>>>> Thanks for sharing your thoughts. I think improve and solve the
>>>>>> limitations
>>>>>>> of the Beam artifact staging is good topic(For beam).
>>>>>>>
>>>>>>> As I understand it as follows:
>>>>>>>
>>>>>>> For Beam(data):
>>>>>>>   Stage1: BeamClient ------> JobService (data will be upload to
>>> DFS).
>>>>>>>   Stage2: JobService(FlinkClient) ------>  FlinkJob (operator
>>> download
>>>>>>> the data from DFS)
>>>>>>>   Stage3: Operator ------> Harness(artifact staging service)
>>>>>>>
>>>>>>> For Flink(data):
>>>>>>>   Stage1: FlinkClient(data(local) upload to BlobServer using
>>> distribute
>>>>>>> cache) ------> Operator (data will be download from BlobServer). Do
>>> not
>>>>>>> have to depend on DFS.
>>>>>>>   Stage2: Operator ------> Harness(for docker we using artifact
>>> staging
>>>>>>> service)
>>>>>>>
>>>>>>> So, I think Beam have to depend on DFS in Stage1. and Stage2 can using
>>>>>>> distribute cache if we remove the dependency of DFS for Beam in
>>>>>> Stage1.(Of
>>>>>>> course we need more detail here),  we can bring up the discussion in a
>>>>>>> separate Beam dev@ ML, the current discussion focuses on Flink 1.10
>>>>>>> version
>>>>>>> of  UDF Environment and Dependency Management for python, so I
>>> recommend
>>>>>>> voting in the current ML for Flink 1.10, Beam artifact staging
>>>>>> improvements
>>>>>>> are discussed in a separate Beam dev@.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jincheng
>>>>>>>
>>>>>>> Thomas Weise <[hidden email]> 于2019年10月21日周一 下午10:25写道:
>>>>>>>
>>>>>>>> Beam artifact staging currently relies on shared file system and
>>> there
>>>>>>> are
>>>>>>>> limitations, for example when running locally with Docker and local
>>> FS.
>>>>>>> It
>>>>>>>> sounds like a distributed cache based implementation might be a good
>>>>>>>> (better?) option for artifact staging even for the Beam Flink runner?
>>>>>>>>
>>>>>>>> If so, can the implementation you propose be compatible with the Beam
>>>>>>>> artifact staging service so that it can be plugged into the Beam
>>> Flink
>>>>>>>> runner?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 21, 2019 at 2:34 AM jincheng sun <
>>> [hidden email]
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Max,
>>>>>>>>>
>>>>>>>>> Sorry for the late reply. Regarding the issue you mentioned above,
>>>>>> I'm
>>>>>>>> glad
>>>>>>>>> to share my thoughts:
>>>>>>>>>
>>>>>>>>>> For process-based execution we use Flink's cache distribution
>>>>>> instead
>>>>>>>> of
>>>>>>>>> Beam's artifact staging.
>>>>>>>>>
>>>>>>>>> In current design, we use Flink's cache distribution to upload
>>> users'
>>>>>>>> files
>>>>>>>>> from client to cluster in both docker mode and process mode. That
>>> is,
>>>>>>>>> Flink's cache distribution and Beam's artifact staging service work
>>>>>>>>> together in docker mode.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>>>> seems
>>>>>>>>> sensible to use the same artifact staging functionality also for the
>>>>>>>>> process-based execution.
>>>>>>>>>
>>>>>>>>> I agree that the implementation will be simple if we use the same
>>>>>>>> artifact
>>>>>>>>> staging functionality also for the process-based execution. However,
>>>>>>> it's
>>>>>>>>> not the best for performance as it will introduce an additional
>>>>>> network
>>>>>>>>> transmission, as in process mode TaskManager and python worker share
>>>>>>> the
>>>>>>>>> same environment, in which case the user files in Flink Distribute
>>>>>>> Cache
>>>>>>>>> can be accessed by python worker directly. We do not need the
>>> staging
>>>>>>>>> service in this case.
>>>>>>>>>
>>>>>>>>>> Apart from being simpler, this would also allow the process-based
>>>>>>>>> execution to run in other environments than the Flink TaskManager
>>>>>>>>> environment.
>>>>>>>>>
>>>>>>>>> IMHO, this case is more like docker mode, and we can share or reuse
>>>>>> the
>>>>>>>>> code of Beam docker mode. Furthermore, in this case python worker is
>>>>>>>>> launched by the operator, so it is always in the same environment as
>>>>>>> the
>>>>>>>>> operator.
>>>>>>>>>
>>>>>>>>> Thanks again for your feedback, and it is valuable for find out the
>>>>>>> final
>>>>>>>>> best architecture.
>>>>>>>>>
>>>>>>>>> Feel free to correct me if there is anything incorrect.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jincheng
>>>>>>>>>
>>>>>>>>> Maximilian Michels <[hidden email]> 于2019年10月16日周三 下午4:23写道:
>>>>>>>>>
>>>>>>>>>> I'm also late to the party here :) When I saw the first draft, I
>>>>>> was
>>>>>>>>>> thinking how exactly the design doc would tie in with Beam. Thanks
>>>>>>> for
>>>>>>>>>> the update.
>>>>>>>>>>
>>>>>>>>>> A couple of comments with this regard:
>>>>>>>>>>
>>>>>>>>>>> Flink has provided a distributed cache mechanism and allows users
>>>>>>> to
>>>>>>>>>> upload their files using "registerCachedFile" method in
>>>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. The python files
>>>>>>> users
>>>>>>>>>> specified through "add_python_file", "set_python_requirements" and
>>>>>>>>>> "add_python_archive" are also uploaded through this method
>>>>>>> eventually.
>>>>>>>>>>
>>>>>>>>>> For process-based execution we use Flink's cache distribution
>>>>>> instead
>>>>>>>> of
>>>>>>>>>> Beam's artifact staging.
>>>>>>>>>>
>>>>>>>>>>> Apache Beam Portability Framework already supports artifact
>>>>>> staging
>>>>>>>>> that
>>>>>>>>>> works out of the box with the Docker environment. We can use the
>>>>>>>> artifact
>>>>>>>>>> staging service defined in Apache Beam to transfer the dependencies
>>>>>>>> from
>>>>>>>>>> the operator to Python SDK harness running in the docker container.
>>>>>>>>>>
>>>>>>>>>> Do we want to implement two different ways of staging artifacts? It
>>>>>>>>>> seems sensible to use the same artifact staging functionality also
>>>>>>> for
>>>>>>>>>> the process-based execution. Apart from being simpler, this would
>>>>>>> also
>>>>>>>>>> allow the process-based execution to run in other environments than
>>>>>>> the
>>>>>>>>>> Flink TaskManager environment.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Max
>>>>>>>>>>
>>>>>>>>>> On 15.10.19 11:13, Wei Zhong wrote:
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot for your suggestion!
>>>>>>>>>>>
>>>>>>>>>>> As you can see from the section "Goals" that this FLIP focuses on
>>>>>>> the
>>>>>>>>>> dependency management in process mode. However, the APIs and design
>>>>>>>>>> proposed in this FLIP also applies for the docker mode. So it makes
>>>>>>>> sense
>>>>>>>>>> to me to also describe how this design is integated to the artifact
>>>>>>>>> staging
>>>>>>>>>> service of Apache Beam in docker mode. I have updated the design
>>>>>> doc
>>>>>>>> and
>>>>>>>>>> looking forward to your feedback.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Wei
>>>>>>>>>>>
>>>>>>>>>>>> 在 2019年10月15日,01:54,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Sorry for joining the discussion late.
>>>>>>>>>>>>
>>>>>>>>>>>> The Beam environment already supports artifact staging, it works
>>>>>>> out
>>>>>>>>> of
>>>>>>>>>> the
>>>>>>>>>>>> box with the Docker environment. I think it would be helpful to
>>>>>>>>> explain
>>>>>>>>>> in
>>>>>>>>>>>> the FLIP how this proposal relates to what Beam offers / how it
>>>>>>>> would
>>>>>>>>> be
>>>>>>>>>>>> integrated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Oct 14, 2019 at 8:09 AM Jeff Zhang <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年10月14日周一 下午10:55写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Good job, Wei!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:54 PM Dian Fu <
>>>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Wei,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1 (non-binding). Thanks for driving this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 在 2019年10月14日,下午1:40,jincheng sun <[hidden email]
>>>>>>>
>>>>>>>> 写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Wei Zhong <[hidden email]> 于2019年10月12日周六 下午8:41写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I would like to start the vote for FLIP-78[1] which is
>>>>>>>> discussed
>>>>>>>>>> and
>>>>>>>>>>>>>>>>> reached consensus in the discussion thread[2].
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The vote will be open for at least 72 hours. I'll try to
>>>>>>> close
>>>>>>>> it
>>>>>>>>>> by
>>>>>>>>>>>>>>>>> 2019-10-16 18:00 UTC, unless there is an objection or not
>>>>>>>> enough
>>>>>>>>>>>>>> votes.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Wei
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78%3A+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-78:+Flink+Python+UDF+Environment+and+Dependency+Management
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>