[DISCUSS] Support scalar vectorized Python UDF in PyFlink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support scalar vectorized Python UDF in PyFlink

Dian Fu
Hi all,

Scalar Python UDF has already been supported in the coming release 1.10 (FLIP-58[1]). It operates one row at a time. It works in the way that the Java operator serializes one input row to bytes and sends them to the Python worker; the Python worker deserializes the input row and evaluates the Python UDF with it; the result row is serialized and sent back to the Java operator.

It suffers from the following problems:
1) High serialization/deserialization overhead
2) It’s difficult to leverage the popular Python libraries used by data scientists, such as Pandas, Numpy, etc which provide high performance data structure and functions.

Jincheng and I have discussed offline and we want to introduce vectorized Python UDF to address the above problems. This feature has also been mentioned in the discussion thread about the Python API plan[2]. For vectorized Python UDF, a batch of rows are transferred between JVM and Python VM in columnar format. The batch of rows will be converted to a collection of Pandas.Series and given to the vectorized Python UDF which could then leverage the popular Python libraries such as Pandas, Numpy, etc for the Python UDF implementation.

Please refer the design doc[3] for more details and welcome any feedback.

Regards,
Dian

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
[2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
[3] https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Jingsong Li
Hi Dian,

+1 for this, thanks driving.
Documentation looks very good. I can imagine a huge performance improvement
and better integration to other Python libraries.

A few thoughts:
- About data split: "python.fn-execution.arrow.batch.size", can we unify it
with "python.fn-execution.bundle.size"?
- Use of Apache Arrow as the exchange format: Do you mean Arrow support
zero-copy between Java and Python?
- ArrowFieldWriter seems we can implement it by code generation. But it is
OK to initial version with virtual function call.
- ColumnarRow for vectorization reading seems that we need implement
ArrowColumnVectors.

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:

> Hi all,
>
> Scalar Python UDF has already been supported in the coming release 1.10
> (FLIP-58[1]). It operates one row at a time. It works in the way that the
> Java operator serializes one input row to bytes and sends them to the
> Python worker; the Python worker deserializes the input row and evaluates
> the Python UDF with it; the result row is serialized and sent back to the
> Java operator.
>
> It suffers from the following problems:
> 1) High serialization/deserialization overhead
> 2) It’s difficult to leverage the popular Python libraries used by data
> scientists, such as Pandas, Numpy, etc which provide high performance data
> structure and functions.
>
> Jincheng and I have discussed offline and we want to introduce vectorized
> Python UDF to address the above problems. This feature has also been
> mentioned in the discussion thread about the Python API plan[2]. For
> vectorized Python UDF, a batch of rows are transferred between JVM and
> Python VM in columnar format. The batch of rows will be converted to a
> collection of Pandas.Series and given to the vectorized Python UDF which
> could then leverage the popular Python libraries such as Pandas, Numpy, etc
> for the Python UDF implementation.
>
> Please refer the design doc[3] for more details and welcome any feedback.
>
> Regards,
> Dian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> [3]
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Dian Fu
Hi Jingsong,

Thanks a lot for the valuable feedback.

1. The configurations "python.fn-execution.bundle.size" and "python.fn-execution.arrow.batch.size" are used for separate purposes and I think they are both needed. If they are unified, the Python operator has to wait the execution results of the previous batch of elements before processing the next batch. This means that the Python UDF execution can not be pipelined between batches. With separate configuration, there will be no such problems.
2. It means that the Java operator will convert input elements to Arrow memory format and then send them to the Python worker, vice verse. Regarding to the zero-copy benefits provided by Arrow, we can gain them automatically using Arrow.
3. Good point! As all the classes of Python module is written in Java and it's not suggested to introduce new Scala classes, so I guess it's not easy to do so right now. But I think this is definitely a good improvement we can do in the future.
4. You're right and we will add a series of Arrow ColumnVectors for each type supported.

Thanks,
Dian

> 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
>
> Hi Dian,
>
> +1 for this, thanks driving.
> Documentation looks very good. I can imagine a huge performance improvement
> and better integration to other Python libraries.
>
> A few thoughts:
> - About data split: "python.fn-execution.arrow.batch.size", can we unify it
> with "python.fn-execution.bundle.size"?
> - Use of Apache Arrow as the exchange format: Do you mean Arrow support
> zero-copy between Java and Python?
> - ArrowFieldWriter seems we can implement it by code generation. But it is
> OK to initial version with virtual function call.
> - ColumnarRow for vectorization reading seems that we need implement
> ArrowColumnVectors.
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:
>
>> Hi all,
>>
>> Scalar Python UDF has already been supported in the coming release 1.10
>> (FLIP-58[1]). It operates one row at a time. It works in the way that the
>> Java operator serializes one input row to bytes and sends them to the
>> Python worker; the Python worker deserializes the input row and evaluates
>> the Python UDF with it; the result row is serialized and sent back to the
>> Java operator.
>>
>> It suffers from the following problems:
>> 1) High serialization/deserialization overhead
>> 2) It’s difficult to leverage the popular Python libraries used by data
>> scientists, such as Pandas, Numpy, etc which provide high performance data
>> structure and functions.
>>
>> Jincheng and I have discussed offline and we want to introduce vectorized
>> Python UDF to address the above problems. This feature has also been
>> mentioned in the discussion thread about the Python API plan[2]. For
>> vectorized Python UDF, a batch of rows are transferred between JVM and
>> Python VM in columnar format. The batch of rows will be converted to a
>> collection of Pandas.Series and given to the vectorized Python UDF which
>> could then leverage the popular Python libraries such as Pandas, Numpy, etc
>> for the Python UDF implementation.
>>
>> Please refer the design doc[3] for more details and welcome any feedback.
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
>> [3]
>> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>>
>>
>
> --
> Best, Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

jincheng sun
Hi Dian,

Thanks for bring up this discussion. This is very important for the
ecological of PyFlink. Add support Pandas greatly enriches the available
UDF library of PyFlink and greatly improves the usability of PyFlink!

+1 for Support scalar vectorized Python UDF.

I think we should to create a FLIP for this big enhancements. :)

What do you think?

Best,
Jincheng



dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:

> Hi Jingsong,
>
> Thanks a lot for the valuable feedback.
>
> 1. The configurations "python.fn-execution.bundle.size" and
> "python.fn-execution.arrow.batch.size" are used for separate purposes and I
> think they are both needed. If they are unified, the Python operator has to
> wait the execution results of the previous batch of elements before
> processing the next batch. This means that the Python UDF execution can not
> be pipelined between batches. With separate configuration, there will be no
> such problems.
> 2. It means that the Java operator will convert input elements to Arrow
> memory format and then send them to the Python worker, vice verse.
> Regarding to the zero-copy benefits provided by Arrow, we can gain them
> automatically using Arrow.
> 3. Good point! As all the classes of Python module is written in Java and
> it's not suggested to introduce new Scala classes, so I guess it's not easy
> to do so right now. But I think this is definitely a good improvement we
> can do in the future.
> 4. You're right and we will add a series of Arrow ColumnVectors for each
> type supported.
>
> Thanks,
> Dian
>
> > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> >
> > Hi Dian,
> >
> > +1 for this, thanks driving.
> > Documentation looks very good. I can imagine a huge performance
> improvement
> > and better integration to other Python libraries.
> >
> > A few thoughts:
> > - About data split: "python.fn-execution.arrow.batch.size", can we unify
> it
> > with "python.fn-execution.bundle.size"?
> > - Use of Apache Arrow as the exchange format: Do you mean Arrow support
> > zero-copy between Java and Python?
> > - ArrowFieldWriter seems we can implement it by code generation. But it
> is
> > OK to initial version with virtual function call.
> > - ColumnarRow for vectorization reading seems that we need implement
> > ArrowColumnVectors.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:
> >
> >> Hi all,
> >>
> >> Scalar Python UDF has already been supported in the coming release 1.10
> >> (FLIP-58[1]). It operates one row at a time. It works in the way that
> the
> >> Java operator serializes one input row to bytes and sends them to the
> >> Python worker; the Python worker deserializes the input row and
> evaluates
> >> the Python UDF with it; the result row is serialized and sent back to
> the
> >> Java operator.
> >>
> >> It suffers from the following problems:
> >> 1) High serialization/deserialization overhead
> >> 2) It’s difficult to leverage the popular Python libraries used by data
> >> scientists, such as Pandas, Numpy, etc which provide high performance
> data
> >> structure and functions.
> >>
> >> Jincheng and I have discussed offline and we want to introduce
> vectorized
> >> Python UDF to address the above problems. This feature has also been
> >> mentioned in the discussion thread about the Python API plan[2]. For
> >> vectorized Python UDF, a batch of rows are transferred between JVM and
> >> Python VM in columnar format. The batch of rows will be converted to a
> >> collection of Pandas.Series and given to the vectorized Python UDF which
> >> could then leverage the popular Python libraries such as Pandas, Numpy,
> etc
> >> for the Python UDF implementation.
> >>
> >> Please refer the design doc[3] for more details and welcome any
> feedback.
> >>
> >> Regards,
> >> Dian
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> >> [2]
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> >> [3]
> >>
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Hequn Cheng-2
Hi Dian,

Thanks a lot for bringing up the discussion!

It is great to see the Pandas UDFs feature is going to be introduced. I
think this would improve the performance and also the usability of
user-defined functions (UDFs) in Python.
One little suggestion: maybe it would be nice if we can add some
performance explanation in the document? (I just very curious:))

+1 to create a FLIP for this big enhancement.

Best,
Hequn

On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <[hidden email]>
wrote:

> Hi Dian,
>
> Thanks for bring up this discussion. This is very important for the
> ecological of PyFlink. Add support Pandas greatly enriches the available
> UDF library of PyFlink and greatly improves the usability of PyFlink!
>
> +1 for Support scalar vectorized Python UDF.
>
> I think we should to create a FLIP for this big enhancements. :)
>
> What do you think?
>
> Best,
> Jincheng
>
>
>
> dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
>
> > Hi Jingsong,
> >
> > Thanks a lot for the valuable feedback.
> >
> > 1. The configurations "python.fn-execution.bundle.size" and
> > "python.fn-execution.arrow.batch.size" are used for separate purposes
> and I
> > think they are both needed. If they are unified, the Python operator has
> to
> > wait the execution results of the previous batch of elements before
> > processing the next batch. This means that the Python UDF execution can
> not
> > be pipelined between batches. With separate configuration, there will be
> no
> > such problems.
> > 2. It means that the Java operator will convert input elements to Arrow
> > memory format and then send them to the Python worker, vice verse.
> > Regarding to the zero-copy benefits provided by Arrow, we can gain them
> > automatically using Arrow.
> > 3. Good point! As all the classes of Python module is written in Java and
> > it's not suggested to introduce new Scala classes, so I guess it's not
> easy
> > to do so right now. But I think this is definitely a good improvement we
> > can do in the future.
> > 4. You're right and we will add a series of Arrow ColumnVectors for each
> > type supported.
> >
> > Thanks,
> > Dian
> >
> > > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> > >
> > > Hi Dian,
> > >
> > > +1 for this, thanks driving.
> > > Documentation looks very good. I can imagine a huge performance
> > improvement
> > > and better integration to other Python libraries.
> > >
> > > A few thoughts:
> > > - About data split: "python.fn-execution.arrow.batch.size", can we
> unify
> > it
> > > with "python.fn-execution.bundle.size"?
> > > - Use of Apache Arrow as the exchange format: Do you mean Arrow support
> > > zero-copy between Java and Python?
> > > - ArrowFieldWriter seems we can implement it by code generation. But it
> > is
> > > OK to initial version with virtual function call.
> > > - ColumnarRow for vectorization reading seems that we need implement
> > > ArrowColumnVectors.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:
> > >
> > >> Hi all,
> > >>
> > >> Scalar Python UDF has already been supported in the coming release
> 1.10
> > >> (FLIP-58[1]). It operates one row at a time. It works in the way that
> > the
> > >> Java operator serializes one input row to bytes and sends them to the
> > >> Python worker; the Python worker deserializes the input row and
> > evaluates
> > >> the Python UDF with it; the result row is serialized and sent back to
> > the
> > >> Java operator.
> > >>
> > >> It suffers from the following problems:
> > >> 1) High serialization/deserialization overhead
> > >> 2) It’s difficult to leverage the popular Python libraries used by
> data
> > >> scientists, such as Pandas, Numpy, etc which provide high performance
> > data
> > >> structure and functions.
> > >>
> > >> Jincheng and I have discussed offline and we want to introduce
> > vectorized
> > >> Python UDF to address the above problems. This feature has also been
> > >> mentioned in the discussion thread about the Python API plan[2]. For
> > >> vectorized Python UDF, a batch of rows are transferred between JVM and
> > >> Python VM in columnar format. The batch of rows will be converted to a
> > >> collection of Pandas.Series and given to the vectorized Python UDF
> which
> > >> could then leverage the popular Python libraries such as Pandas,
> Numpy,
> > etc
> > >> for the Python UDF implementation.
> > >>
> > >> Please refer the design doc[3] for more details and welcome any
> > feedback.
> > >>
> > >> Regards,
> > >> Dian
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> > >> [2]
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> > >> [3]
> > >>
> >
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> > >>
> > >>
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Jingsong Li
Thanks Dian for your reply.

+1 to create a FLIP too.

About "python.fn-execution.bundle.size" and
"python.fn-execution.arrow.batch.size", I got what are you mean about
"pipeline". I agree.
It seems that a batch should always in a bundle. Bundle size should always
bigger than batch size. (if a batch can not cross bundle).
Can you explain this relationship to the document?

I think default value is a very important thing, we can discuss:
- In the batch world, vectorization batch size is about 1024+. What do you
think about the default value of "batch"?
- Can we only configure one parameter and calculate another automatically?
For example, if we just want to "pipeline", "bundle.size" is twice as much
as "batch.size", is this work?

Best,
Jingsong Lee

On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]> wrote:

> Hi Dian,
>
> Thanks a lot for bringing up the discussion!
>
> It is great to see the Pandas UDFs feature is going to be introduced. I
> think this would improve the performance and also the usability of
> user-defined functions (UDFs) in Python.
> One little suggestion: maybe it would be nice if we can add some
> performance explanation in the document? (I just very curious:))
>
> +1 to create a FLIP for this big enhancement.
>
> Best,
> Hequn
>
> On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <[hidden email]>
> wrote:
>
> > Hi Dian,
> >
> > Thanks for bring up this discussion. This is very important for the
> > ecological of PyFlink. Add support Pandas greatly enriches the available
> > UDF library of PyFlink and greatly improves the usability of PyFlink!
> >
> > +1 for Support scalar vectorized Python UDF.
> >
> > I think we should to create a FLIP for this big enhancements. :)
> >
> > What do you think?
> >
> > Best,
> > Jincheng
> >
> >
> >
> > dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
> >
> > > Hi Jingsong,
> > >
> > > Thanks a lot for the valuable feedback.
> > >
> > > 1. The configurations "python.fn-execution.bundle.size" and
> > > "python.fn-execution.arrow.batch.size" are used for separate purposes
> > and I
> > > think they are both needed. If they are unified, the Python operator
> has
> > to
> > > wait the execution results of the previous batch of elements before
> > > processing the next batch. This means that the Python UDF execution can
> > not
> > > be pipelined between batches. With separate configuration, there will
> be
> > no
> > > such problems.
> > > 2. It means that the Java operator will convert input elements to Arrow
> > > memory format and then send them to the Python worker, vice verse.
> > > Regarding to the zero-copy benefits provided by Arrow, we can gain them
> > > automatically using Arrow.
> > > 3. Good point! As all the classes of Python module is written in Java
> and
> > > it's not suggested to introduce new Scala classes, so I guess it's not
> > easy
> > > to do so right now. But I think this is definitely a good improvement
> we
> > > can do in the future.
> > > 4. You're right and we will add a series of Arrow ColumnVectors for
> each
> > > type supported.
> > >
> > > Thanks,
> > > Dian
> > >
> > > > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> > > >
> > > > Hi Dian,
> > > >
> > > > +1 for this, thanks driving.
> > > > Documentation looks very good. I can imagine a huge performance
> > > improvement
> > > > and better integration to other Python libraries.
> > > >
> > > > A few thoughts:
> > > > - About data split: "python.fn-execution.arrow.batch.size", can we
> > unify
> > > it
> > > > with "python.fn-execution.bundle.size"?
> > > > - Use of Apache Arrow as the exchange format: Do you mean Arrow
> support
> > > > zero-copy between Java and Python?
> > > > - ArrowFieldWriter seems we can implement it by code generation. But
> it
> > > is
> > > > OK to initial version with virtual function call.
> > > > - ColumnarRow for vectorization reading seems that we need implement
> > > > ArrowColumnVectors.
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Scalar Python UDF has already been supported in the coming release
> > 1.10
> > > >> (FLIP-58[1]). It operates one row at a time. It works in the way
> that
> > > the
> > > >> Java operator serializes one input row to bytes and sends them to
> the
> > > >> Python worker; the Python worker deserializes the input row and
> > > evaluates
> > > >> the Python UDF with it; the result row is serialized and sent back
> to
> > > the
> > > >> Java operator.
> > > >>
> > > >> It suffers from the following problems:
> > > >> 1) High serialization/deserialization overhead
> > > >> 2) It’s difficult to leverage the popular Python libraries used by
> > data
> > > >> scientists, such as Pandas, Numpy, etc which provide high
> performance
> > > data
> > > >> structure and functions.
> > > >>
> > > >> Jincheng and I have discussed offline and we want to introduce
> > > vectorized
> > > >> Python UDF to address the above problems. This feature has also been
> > > >> mentioned in the discussion thread about the Python API plan[2]. For
> > > >> vectorized Python UDF, a batch of rows are transferred between JVM
> and
> > > >> Python VM in columnar format. The batch of rows will be converted
> to a
> > > >> collection of Pandas.Series and given to the vectorized Python UDF
> > which
> > > >> could then leverage the popular Python libraries such as Pandas,
> > Numpy,
> > > etc
> > > >> for the Python UDF implementation.
> > > >>
> > > >> Please refer the design doc[3] for more details and welcome any
> > > feedback.
> > > >>
> > > >> Regards,
> > > >> Dian
> > > >>
> > > >> [1]
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> > > >> [2]
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> > > >> [3]
> > > >>
> > >
> >
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> > > >>
> > > >>
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > >
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

jincheng sun
Hi Jingsong,

Thanks for your feedback! I would like to share my thoughts regarding the
follows question:

>> - Can we only configure one parameter and calculate another
automatically? For example, if we just want to "pipeline", "bundle.size" is
twice as much as "batch.size", is this work?

I don't think this works. These two configurations are used for different
purposes and there is no direct relationship between them and so I guess we
cannot infer a configuration from the other configuration.

Best,
Jincheng


Jingsong Li <[hidden email]> 于2020年2月10日周一 下午1:53写道:

> Thanks Dian for your reply.
>
> +1 to create a FLIP too.
>
> About "python.fn-execution.bundle.size" and
> "python.fn-execution.arrow.batch.size", I got what are you mean about
> "pipeline". I agree.
> It seems that a batch should always in a bundle. Bundle size should always
> bigger than batch size. (if a batch can not cross bundle).
> Can you explain this relationship to the document?
>
> I think default value is a very important thing, we can discuss:
> - In the batch world, vectorization batch size is about 1024+. What do you
> think about the default value of "batch"?
> - Can we only configure one parameter and calculate another automatically?
> For example, if we just want to "pipeline", "bundle.size" is twice as much
> as "batch.size", is this work?
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]> wrote:
>
> > Hi Dian,
> >
> > Thanks a lot for bringing up the discussion!
> >
> > It is great to see the Pandas UDFs feature is going to be introduced. I
> > think this would improve the performance and also the usability of
> > user-defined functions (UDFs) in Python.
> > One little suggestion: maybe it would be nice if we can add some
> > performance explanation in the document? (I just very curious:))
> >
> > +1 to create a FLIP for this big enhancement.
> >
> > Best,
> > Hequn
> >
> > On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <[hidden email]>
> > wrote:
> >
> > > Hi Dian,
> > >
> > > Thanks for bring up this discussion. This is very important for the
> > > ecological of PyFlink. Add support Pandas greatly enriches the
> available
> > > UDF library of PyFlink and greatly improves the usability of PyFlink!
> > >
> > > +1 for Support scalar vectorized Python UDF.
> > >
> > > I think we should to create a FLIP for this big enhancements. :)
> > >
> > > What do you think?
> > >
> > > Best,
> > > Jincheng
> > >
> > >
> > >
> > > dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
> > >
> > > > Hi Jingsong,
> > > >
> > > > Thanks a lot for the valuable feedback.
> > > >
> > > > 1. The configurations "python.fn-execution.bundle.size" and
> > > > "python.fn-execution.arrow.batch.size" are used for separate purposes
> > > and I
> > > > think they are both needed. If they are unified, the Python operator
> > has
> > > to
> > > > wait the execution results of the previous batch of elements before
> > > > processing the next batch. This means that the Python UDF execution
> can
> > > not
> > > > be pipelined between batches. With separate configuration, there will
> > be
> > > no
> > > > such problems.
> > > > 2. It means that the Java operator will convert input elements to
> Arrow
> > > > memory format and then send them to the Python worker, vice verse.
> > > > Regarding to the zero-copy benefits provided by Arrow, we can gain
> them
> > > > automatically using Arrow.
> > > > 3. Good point! As all the classes of Python module is written in Java
> > and
> > > > it's not suggested to introduce new Scala classes, so I guess it's
> not
> > > easy
> > > > to do so right now. But I think this is definitely a good improvement
> > we
> > > > can do in the future.
> > > > 4. You're right and we will add a series of Arrow ColumnVectors for
> > each
> > > > type supported.
> > > >
> > > > Thanks,
> > > > Dian
> > > >
> > > > > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> > > > >
> > > > > Hi Dian,
> > > > >
> > > > > +1 for this, thanks driving.
> > > > > Documentation looks very good. I can imagine a huge performance
> > > > improvement
> > > > > and better integration to other Python libraries.
> > > > >
> > > > > A few thoughts:
> > > > > - About data split: "python.fn-execution.arrow.batch.size", can we
> > > unify
> > > > it
> > > > > with "python.fn-execution.bundle.size"?
> > > > > - Use of Apache Arrow as the exchange format: Do you mean Arrow
> > support
> > > > > zero-copy between Java and Python?
> > > > > - ArrowFieldWriter seems we can implement it by code generation.
> But
> > it
> > > > is
> > > > > OK to initial version with virtual function call.
> > > > > - ColumnarRow for vectorization reading seems that we need
> implement
> > > > > ArrowColumnVectors.
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]> wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Scalar Python UDF has already been supported in the coming release
> > > 1.10
> > > > >> (FLIP-58[1]). It operates one row at a time. It works in the way
> > that
> > > > the
> > > > >> Java operator serializes one input row to bytes and sends them to
> > the
> > > > >> Python worker; the Python worker deserializes the input row and
> > > > evaluates
> > > > >> the Python UDF with it; the result row is serialized and sent back
> > to
> > > > the
> > > > >> Java operator.
> > > > >>
> > > > >> It suffers from the following problems:
> > > > >> 1) High serialization/deserialization overhead
> > > > >> 2) It’s difficult to leverage the popular Python libraries used by
> > > data
> > > > >> scientists, such as Pandas, Numpy, etc which provide high
> > performance
> > > > data
> > > > >> structure and functions.
> > > > >>
> > > > >> Jincheng and I have discussed offline and we want to introduce
> > > > vectorized
> > > > >> Python UDF to address the above problems. This feature has also
> been
> > > > >> mentioned in the discussion thread about the Python API plan[2].
> For
> > > > >> vectorized Python UDF, a batch of rows are transferred between JVM
> > and
> > > > >> Python VM in columnar format. The batch of rows will be converted
> > to a
> > > > >> collection of Pandas.Series and given to the vectorized Python UDF
> > > which
> > > > >> could then leverage the popular Python libraries such as Pandas,
> > > Numpy,
> > > > etc
> > > > >> for the Python UDF implementation.
> > > > >>
> > > > >> Please refer the design doc[3] for more details and welcome any
> > > > feedback.
> > > > >>
> > > > >> Regards,
> > > > >> Dian
> > > > >>
> > > > >> [1]
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> > > > >> [2]
> > > > >>
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> > > > >> [3]
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> > > > >>
> > > > >>
> > > > >
> > > > > --
> > > > > Best, Jingsong Lee
> > > >
> > > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Dian Fu-2
Hi Jincheng, Hequn & Jingsong,

Thanks a lot for your suggestions. I have created FLIP-97[1] for this
feature.

> One little suggestion: maybe it would be nice if we can add some
performance explanation in the document? (I just very curious:))
Thanks for the suggestion. I have updated the design doc in the
"BackGround" section about where the performance gains could be got from.

> It seems that a batch should always in a bundle. Bundle size should always
bigger than batch size. (if a batch can not cross bundle).
Can you explain this relationship to the document?
I have updated the design doc explaining more about these two
configurations.

> In the batch world, vectorization batch size is about 1024+. What do you
think about the default value of "batch"?
Is there any link about where this value comes from? I have performed a
simple test for Pandas UDF which performs the simple +1 operation. The
performance is best when the batch size is set to 5000. I think it depends
on the data type of each column, the functionality the Pandas UDF does,
etc. However I agree with you that we could give a meaningful default value
for the "batch" size which works in most scenarios.

> Can we only configure one parameter and calculate another automatically?
For example, if we just want to "pipeline", "bundle.size" is twice as much
as "batch.size", is this work?
I agree with Jincheng that this is not feasible. I think that giving an
meaningful default value for the "batch.size" which works in most scenarios
is enough. What's your thought?

Thanks,
Dian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink


On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <[hidden email]>
wrote:

> Hi Jingsong,
>
> Thanks for your feedback! I would like to share my thoughts regarding the
> follows question:
>
> >> - Can we only configure one parameter and calculate another
> automatically? For example, if we just want to "pipeline", "bundle.size" is
> twice as much as "batch.size", is this work?
>
> I don't think this works. These two configurations are used for different
> purposes and there is no direct relationship between them and so I guess we
> cannot infer a configuration from the other configuration.
>
> Best,
> Jincheng
>
>
> Jingsong Li <[hidden email]> 于2020年2月10日周一 下午1:53写道:
>
> > Thanks Dian for your reply.
> >
> > +1 to create a FLIP too.
> >
> > About "python.fn-execution.bundle.size" and
> > "python.fn-execution.arrow.batch.size", I got what are you mean about
> > "pipeline". I agree.
> > It seems that a batch should always in a bundle. Bundle size should
> always
> > bigger than batch size. (if a batch can not cross bundle).
> > Can you explain this relationship to the document?
> >
> > I think default value is a very important thing, we can discuss:
> > - In the batch world, vectorization batch size is about 1024+. What do
> you
> > think about the default value of "batch"?
> > - Can we only configure one parameter and calculate another
> automatically?
> > For example, if we just want to "pipeline", "bundle.size" is twice as
> much
> > as "batch.size", is this work?
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]> wrote:
> >
> > > Hi Dian,
> > >
> > > Thanks a lot for bringing up the discussion!
> > >
> > > It is great to see the Pandas UDFs feature is going to be introduced. I
> > > think this would improve the performance and also the usability of
> > > user-defined functions (UDFs) in Python.
> > > One little suggestion: maybe it would be nice if we can add some
> > > performance explanation in the document? (I just very curious:))
> > >
> > > +1 to create a FLIP for this big enhancement.
> > >
> > > Best,
> > > Hequn
> > >
> > > On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <
> [hidden email]>
> > > wrote:
> > >
> > > > Hi Dian,
> > > >
> > > > Thanks for bring up this discussion. This is very important for the
> > > > ecological of PyFlink. Add support Pandas greatly enriches the
> > available
> > > > UDF library of PyFlink and greatly improves the usability of PyFlink!
> > > >
> > > > +1 for Support scalar vectorized Python UDF.
> > > >
> > > > I think we should to create a FLIP for this big enhancements. :)
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Jincheng
> > > >
> > > >
> > > >
> > > > dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
> > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > Thanks a lot for the valuable feedback.
> > > > >
> > > > > 1. The configurations "python.fn-execution.bundle.size" and
> > > > > "python.fn-execution.arrow.batch.size" are used for separate
> purposes
> > > > and I
> > > > > think they are both needed. If they are unified, the Python
> operator
> > > has
> > > > to
> > > > > wait the execution results of the previous batch of elements before
> > > > > processing the next batch. This means that the Python UDF execution
> > can
> > > > not
> > > > > be pipelined between batches. With separate configuration, there
> will
> > > be
> > > > no
> > > > > such problems.
> > > > > 2. It means that the Java operator will convert input elements to
> > Arrow
> > > > > memory format and then send them to the Python worker, vice verse.
> > > > > Regarding to the zero-copy benefits provided by Arrow, we can gain
> > them
> > > > > automatically using Arrow.
> > > > > 3. Good point! As all the classes of Python module is written in
> Java
> > > and
> > > > > it's not suggested to introduce new Scala classes, so I guess it's
> > not
> > > > easy
> > > > > to do so right now. But I think this is definitely a good
> improvement
> > > we
> > > > > can do in the future.
> > > > > 4. You're right and we will add a series of Arrow ColumnVectors for
> > > each
> > > > > type supported.
> > > > >
> > > > > Thanks,
> > > > > Dian
> > > > >
> > > > > > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> > > > > >
> > > > > > Hi Dian,
> > > > > >
> > > > > > +1 for this, thanks driving.
> > > > > > Documentation looks very good. I can imagine a huge performance
> > > > > improvement
> > > > > > and better integration to other Python libraries.
> > > > > >
> > > > > > A few thoughts:
> > > > > > - About data split: "python.fn-execution.arrow.batch.size", can
> we
> > > > unify
> > > > > it
> > > > > > with "python.fn-execution.bundle.size"?
> > > > > > - Use of Apache Arrow as the exchange format: Do you mean Arrow
> > > support
> > > > > > zero-copy between Java and Python?
> > > > > > - ArrowFieldWriter seems we can implement it by code generation.
> > But
> > > it
> > > > > is
> > > > > > OK to initial version with virtual function call.
> > > > > > - ColumnarRow for vectorization reading seems that we need
> > implement
> > > > > > ArrowColumnVectors.
> > > > > >
> > > > > > Best,
> > > > > > Jingsong Lee
> > > > > >
> > > > > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]>
> wrote:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Scalar Python UDF has already been supported in the coming
> release
> > > > 1.10
> > > > > >> (FLIP-58[1]). It operates one row at a time. It works in the way
> > > that
> > > > > the
> > > > > >> Java operator serializes one input row to bytes and sends them
> to
> > > the
> > > > > >> Python worker; the Python worker deserializes the input row and
> > > > > evaluates
> > > > > >> the Python UDF with it; the result row is serialized and sent
> back
> > > to
> > > > > the
> > > > > >> Java operator.
> > > > > >>
> > > > > >> It suffers from the following problems:
> > > > > >> 1) High serialization/deserialization overhead
> > > > > >> 2) It’s difficult to leverage the popular Python libraries used
> by
> > > > data
> > > > > >> scientists, such as Pandas, Numpy, etc which provide high
> > > performance
> > > > > data
> > > > > >> structure and functions.
> > > > > >>
> > > > > >> Jincheng and I have discussed offline and we want to introduce
> > > > > vectorized
> > > > > >> Python UDF to address the above problems. This feature has also
> > been
> > > > > >> mentioned in the discussion thread about the Python API plan[2].
> > For
> > > > > >> vectorized Python UDF, a batch of rows are transferred between
> JVM
> > > and
> > > > > >> Python VM in columnar format. The batch of rows will be
> converted
> > > to a
> > > > > >> collection of Pandas.Series and given to the vectorized Python
> UDF
> > > > which
> > > > > >> could then leverage the popular Python libraries such as Pandas,
> > > > Numpy,
> > > > > etc
> > > > > >> for the Python UDF implementation.
> > > > > >>
> > > > > >> Please refer the design doc[3] for more details and welcome any
> > > > > feedback.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Dian
> > > > > >>
> > > > > >> [1]
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> > > > > >> [2]
> > > > > >>
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> > > > > >> [3]
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> > > > > >>
> > > > > >>
> > > > > >
> > > > > > --
> > > > > > Best, Jingsong Lee
> > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Jingsong Li
Hi Dian and Jincheng,

Thanks for your explanation. Think again. Maybe most of users don't want to
modify this parameters.
We all realize that "batch.size" should be a larger value, so "bundle.size"
must also be increased. Now the default value of "bundle.size" is only 1000.
I think you can update design to provide meaningful default value for
"batch.size" and "bundle.size".

Best,
Jingsong Lee

On Mon, Feb 10, 2020 at 4:36 PM Dian Fu <[hidden email]> wrote:

> Hi Jincheng, Hequn & Jingsong,
>
> Thanks a lot for your suggestions. I have created FLIP-97[1] for this
> feature.
>
> > One little suggestion: maybe it would be nice if we can add some
> performance explanation in the document? (I just very curious:))
> Thanks for the suggestion. I have updated the design doc in the
> "BackGround" section about where the performance gains could be got from.
>
> > It seems that a batch should always in a bundle. Bundle size should
> always
> bigger than batch size. (if a batch can not cross bundle).
> Can you explain this relationship to the document?
> I have updated the design doc explaining more about these two
> configurations.
>
> > In the batch world, vectorization batch size is about 1024+. What do you
> think about the default value of "batch"?
> Is there any link about where this value comes from? I have performed a
> simple test for Pandas UDF which performs the simple +1 operation. The
> performance is best when the batch size is set to 5000. I think it depends
> on the data type of each column, the functionality the Pandas UDF does,
> etc. However I agree with you that we could give a meaningful default value
> for the "batch" size which works in most scenarios.
>
> > Can we only configure one parameter and calculate another automatically?
> For example, if we just want to "pipeline", "bundle.size" is twice as much
> as "batch.size", is this work?
> I agree with Jincheng that this is not feasible. I think that giving an
> meaningful default value for the "batch.size" which works in most scenarios
> is enough. What's your thought?
>
> Thanks,
> Dian
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink
>
>
> On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <[hidden email]>
> wrote:
>
> > Hi Jingsong,
> >
> > Thanks for your feedback! I would like to share my thoughts regarding the
> > follows question:
> >
> > >> - Can we only configure one parameter and calculate another
> > automatically? For example, if we just want to "pipeline", "bundle.size"
> is
> > twice as much as "batch.size", is this work?
> >
> > I don't think this works. These two configurations are used for different
> > purposes and there is no direct relationship between them and so I guess
> we
> > cannot infer a configuration from the other configuration.
> >
> > Best,
> > Jincheng
> >
> >
> > Jingsong Li <[hidden email]> 于2020年2月10日周一 下午1:53写道:
> >
> > > Thanks Dian for your reply.
> > >
> > > +1 to create a FLIP too.
> > >
> > > About "python.fn-execution.bundle.size" and
> > > "python.fn-execution.arrow.batch.size", I got what are you mean about
> > > "pipeline". I agree.
> > > It seems that a batch should always in a bundle. Bundle size should
> > always
> > > bigger than batch size. (if a batch can not cross bundle).
> > > Can you explain this relationship to the document?
> > >
> > > I think default value is a very important thing, we can discuss:
> > > - In the batch world, vectorization batch size is about 1024+. What do
> > you
> > > think about the default value of "batch"?
> > > - Can we only configure one parameter and calculate another
> > automatically?
> > > For example, if we just want to "pipeline", "bundle.size" is twice as
> > much
> > > as "batch.size", is this work?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]> wrote:
> > >
> > > > Hi Dian,
> > > >
> > > > Thanks a lot for bringing up the discussion!
> > > >
> > > > It is great to see the Pandas UDFs feature is going to be
> introduced. I
> > > > think this would improve the performance and also the usability of
> > > > user-defined functions (UDFs) in Python.
> > > > One little suggestion: maybe it would be nice if we can add some
> > > > performance explanation in the document? (I just very curious:))
> > > >
> > > > +1 to create a FLIP for this big enhancement.
> > > >
> > > > Best,
> > > > Hequn
> > > >
> > > > On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <
> > [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi Dian,
> > > > >
> > > > > Thanks for bring up this discussion. This is very important for the
> > > > > ecological of PyFlink. Add support Pandas greatly enriches the
> > > available
> > > > > UDF library of PyFlink and greatly improves the usability of
> PyFlink!
> > > > >
> > > > > +1 for Support scalar vectorized Python UDF.
> > > > >
> > > > > I think we should to create a FLIP for this big enhancements. :)
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Best,
> > > > > Jincheng
> > > > >
> > > > >
> > > > >
> > > > > dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
> > > > >
> > > > > > Hi Jingsong,
> > > > > >
> > > > > > Thanks a lot for the valuable feedback.
> > > > > >
> > > > > > 1. The configurations "python.fn-execution.bundle.size" and
> > > > > > "python.fn-execution.arrow.batch.size" are used for separate
> > purposes
> > > > > and I
> > > > > > think they are both needed. If they are unified, the Python
> > operator
> > > > has
> > > > > to
> > > > > > wait the execution results of the previous batch of elements
> before
> > > > > > processing the next batch. This means that the Python UDF
> execution
> > > can
> > > > > not
> > > > > > be pipelined between batches. With separate configuration, there
> > will
> > > > be
> > > > > no
> > > > > > such problems.
> > > > > > 2. It means that the Java operator will convert input elements to
> > > Arrow
> > > > > > memory format and then send them to the Python worker, vice
> verse.
> > > > > > Regarding to the zero-copy benefits provided by Arrow, we can
> gain
> > > them
> > > > > > automatically using Arrow.
> > > > > > 3. Good point! As all the classes of Python module is written in
> > Java
> > > > and
> > > > > > it's not suggested to introduce new Scala classes, so I guess
> it's
> > > not
> > > > > easy
> > > > > > to do so right now. But I think this is definitely a good
> > improvement
> > > > we
> > > > > > can do in the future.
> > > > > > 4. You're right and we will add a series of Arrow ColumnVectors
> for
> > > > each
> > > > > > type supported.
> > > > > >
> > > > > > Thanks,
> > > > > > Dian
> > > > > >
> > > > > > > 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> > > > > > >
> > > > > > > Hi Dian,
> > > > > > >
> > > > > > > +1 for this, thanks driving.
> > > > > > > Documentation looks very good. I can imagine a huge performance
> > > > > > improvement
> > > > > > > and better integration to other Python libraries.
> > > > > > >
> > > > > > > A few thoughts:
> > > > > > > - About data split: "python.fn-execution.arrow.batch.size", can
> > we
> > > > > unify
> > > > > > it
> > > > > > > with "python.fn-execution.bundle.size"?
> > > > > > > - Use of Apache Arrow as the exchange format: Do you mean Arrow
> > > > support
> > > > > > > zero-copy between Java and Python?
> > > > > > > - ArrowFieldWriter seems we can implement it by code
> generation.
> > > But
> > > > it
> > > > > > is
> > > > > > > OK to initial version with virtual function call.
> > > > > > > - ColumnarRow for vectorization reading seems that we need
> > > implement
> > > > > > > ArrowColumnVectors.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong Lee
> > > > > > >
> > > > > > > On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]>
> > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> Scalar Python UDF has already been supported in the coming
> > release
> > > > > 1.10
> > > > > > >> (FLIP-58[1]). It operates one row at a time. It works in the
> way
> > > > that
> > > > > > the
> > > > > > >> Java operator serializes one input row to bytes and sends them
> > to
> > > > the
> > > > > > >> Python worker; the Python worker deserializes the input row
> and
> > > > > > evaluates
> > > > > > >> the Python UDF with it; the result row is serialized and sent
> > back
> > > > to
> > > > > > the
> > > > > > >> Java operator.
> > > > > > >>
> > > > > > >> It suffers from the following problems:
> > > > > > >> 1) High serialization/deserialization overhead
> > > > > > >> 2) It’s difficult to leverage the popular Python libraries
> used
> > by
> > > > > data
> > > > > > >> scientists, such as Pandas, Numpy, etc which provide high
> > > > performance
> > > > > > data
> > > > > > >> structure and functions.
> > > > > > >>
> > > > > > >> Jincheng and I have discussed offline and we want to introduce
> > > > > > vectorized
> > > > > > >> Python UDF to address the above problems. This feature has
> also
> > > been
> > > > > > >> mentioned in the discussion thread about the Python API
> plan[2].
> > > For
> > > > > > >> vectorized Python UDF, a batch of rows are transferred between
> > JVM
> > > > and
> > > > > > >> Python VM in columnar format. The batch of rows will be
> > converted
> > > > to a
> > > > > > >> collection of Pandas.Series and given to the vectorized Python
> > UDF
> > > > > which
> > > > > > >> could then leverage the popular Python libraries such as
> Pandas,
> > > > > Numpy,
> > > > > > etc
> > > > > > >> for the Python UDF implementation.
> > > > > > >>
> > > > > > >> Please refer the design doc[3] for more details and welcome
> any
> > > > > > feedback.
> > > > > > >>
> > > > > > >> Regards,
> > > > > > >> Dian
> > > > > > >>
> > > > > > >> [1]
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> > > > > > >> [2]
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> > > > > > >> [3]
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > > --
> > > > > > > Best, Jingsong Lee
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Dian Fu-2
Hi Jingsong,

You're right. I have updated the FLIP which reflects this.

Thanks,
Dian

> 在 2020年2月11日,上午10:03,Jingsong Li <[hidden email]> 写道:
>
> Hi Dian and Jincheng,
>
> Thanks for your explanation. Think again. Maybe most of users don't want to
> modify this parameters.
> We all realize that "batch.size" should be a larger value, so "bundle.size"
> must also be increased. Now the default value of "bundle.size" is only 1000.
> I think you can update design to provide meaningful default value for
> "batch.size" and "bundle.size".
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 10, 2020 at 4:36 PM Dian Fu <[hidden email]> wrote:
>
>> Hi Jincheng, Hequn & Jingsong,
>>
>> Thanks a lot for your suggestions. I have created FLIP-97[1] for this
>> feature.
>>
>>> One little suggestion: maybe it would be nice if we can add some
>> performance explanation in the document? (I just very curious:))
>> Thanks for the suggestion. I have updated the design doc in the
>> "BackGround" section about where the performance gains could be got from.
>>
>>> It seems that a batch should always in a bundle. Bundle size should
>> always
>> bigger than batch size. (if a batch can not cross bundle).
>> Can you explain this relationship to the document?
>> I have updated the design doc explaining more about these two
>> configurations.
>>
>>> In the batch world, vectorization batch size is about 1024+. What do you
>> think about the default value of "batch"?
>> Is there any link about where this value comes from? I have performed a
>> simple test for Pandas UDF which performs the simple +1 operation. The
>> performance is best when the batch size is set to 5000. I think it depends
>> on the data type of each column, the functionality the Pandas UDF does,
>> etc. However I agree with you that we could give a meaningful default value
>> for the "batch" size which works in most scenarios.
>>
>>> Can we only configure one parameter and calculate another automatically?
>> For example, if we just want to "pipeline", "bundle.size" is twice as much
>> as "batch.size", is this work?
>> I agree with Jincheng that this is not feasible. I think that giving an
>> meaningful default value for the "batch.size" which works in most scenarios
>> is enough. What's your thought?
>>
>> Thanks,
>> Dian
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink
>>
>>
>> On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <[hidden email]>
>> wrote:
>>
>>> Hi Jingsong,
>>>
>>> Thanks for your feedback! I would like to share my thoughts regarding the
>>> follows question:
>>>
>>>>> - Can we only configure one parameter and calculate another
>>> automatically? For example, if we just want to "pipeline", "bundle.size"
>> is
>>> twice as much as "batch.size", is this work?
>>>
>>> I don't think this works. These two configurations are used for different
>>> purposes and there is no direct relationship between them and so I guess
>> we
>>> cannot infer a configuration from the other configuration.
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Jingsong Li <[hidden email]> 于2020年2月10日周一 下午1:53写道:
>>>
>>>> Thanks Dian for your reply.
>>>>
>>>> +1 to create a FLIP too.
>>>>
>>>> About "python.fn-execution.bundle.size" and
>>>> "python.fn-execution.arrow.batch.size", I got what are you mean about
>>>> "pipeline". I agree.
>>>> It seems that a batch should always in a bundle. Bundle size should
>>> always
>>>> bigger than batch size. (if a batch can not cross bundle).
>>>> Can you explain this relationship to the document?
>>>>
>>>> I think default value is a very important thing, we can discuss:
>>>> - In the batch world, vectorization batch size is about 1024+. What do
>>> you
>>>> think about the default value of "batch"?
>>>> - Can we only configure one parameter and calculate another
>>> automatically?
>>>> For example, if we just want to "pipeline", "bundle.size" is twice as
>>> much
>>>> as "batch.size", is this work?
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]> wrote:
>>>>
>>>>> Hi Dian,
>>>>>
>>>>> Thanks a lot for bringing up the discussion!
>>>>>
>>>>> It is great to see the Pandas UDFs feature is going to be
>> introduced. I
>>>>> think this would improve the performance and also the usability of
>>>>> user-defined functions (UDFs) in Python.
>>>>> One little suggestion: maybe it would be nice if we can add some
>>>>> performance explanation in the document? (I just very curious:))
>>>>>
>>>>> +1 to create a FLIP for this big enhancement.
>>>>>
>>>>> Best,
>>>>> Hequn
>>>>>
>>>>> On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <
>>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Dian,
>>>>>>
>>>>>> Thanks for bring up this discussion. This is very important for the
>>>>>> ecological of PyFlink. Add support Pandas greatly enriches the
>>>> available
>>>>>> UDF library of PyFlink and greatly improves the usability of
>> PyFlink!
>>>>>>
>>>>>> +1 for Support scalar vectorized Python UDF.
>>>>>>
>>>>>> I think we should to create a FLIP for this big enhancements. :)
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>>
>>>>>>
>>>>>> dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
>>>>>>
>>>>>>> Hi Jingsong,
>>>>>>>
>>>>>>> Thanks a lot for the valuable feedback.
>>>>>>>
>>>>>>> 1. The configurations "python.fn-execution.bundle.size" and
>>>>>>> "python.fn-execution.arrow.batch.size" are used for separate
>>> purposes
>>>>>> and I
>>>>>>> think they are both needed. If they are unified, the Python
>>> operator
>>>>> has
>>>>>> to
>>>>>>> wait the execution results of the previous batch of elements
>> before
>>>>>>> processing the next batch. This means that the Python UDF
>> execution
>>>> can
>>>>>> not
>>>>>>> be pipelined between batches. With separate configuration, there
>>> will
>>>>> be
>>>>>> no
>>>>>>> such problems.
>>>>>>> 2. It means that the Java operator will convert input elements to
>>>> Arrow
>>>>>>> memory format and then send them to the Python worker, vice
>> verse.
>>>>>>> Regarding to the zero-copy benefits provided by Arrow, we can
>> gain
>>>> them
>>>>>>> automatically using Arrow.
>>>>>>> 3. Good point! As all the classes of Python module is written in
>>> Java
>>>>> and
>>>>>>> it's not suggested to introduce new Scala classes, so I guess
>> it's
>>>> not
>>>>>> easy
>>>>>>> to do so right now. But I think this is definitely a good
>>> improvement
>>>>> we
>>>>>>> can do in the future.
>>>>>>> 4. You're right and we will add a series of Arrow ColumnVectors
>> for
>>>>> each
>>>>>>> type supported.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dian
>>>>>>>
>>>>>>>> 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
>>>>>>>>
>>>>>>>> Hi Dian,
>>>>>>>>
>>>>>>>> +1 for this, thanks driving.
>>>>>>>> Documentation looks very good. I can imagine a huge performance
>>>>>>> improvement
>>>>>>>> and better integration to other Python libraries.
>>>>>>>>
>>>>>>>> A few thoughts:
>>>>>>>> - About data split: "python.fn-execution.arrow.batch.size", can
>>> we
>>>>>> unify
>>>>>>> it
>>>>>>>> with "python.fn-execution.bundle.size"?
>>>>>>>> - Use of Apache Arrow as the exchange format: Do you mean Arrow
>>>>> support
>>>>>>>> zero-copy between Java and Python?
>>>>>>>> - ArrowFieldWriter seems we can implement it by code
>> generation.
>>>> But
>>>>> it
>>>>>>> is
>>>>>>>> OK to initial version with virtual function call.
>>>>>>>> - ColumnarRow for vectorization reading seems that we need
>>>> implement
>>>>>>>> ArrowColumnVectors.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong Lee
>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]>
>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Scalar Python UDF has already been supported in the coming
>>> release
>>>>>> 1.10
>>>>>>>>> (FLIP-58[1]). It operates one row at a time. It works in the
>> way
>>>>> that
>>>>>>> the
>>>>>>>>> Java operator serializes one input row to bytes and sends them
>>> to
>>>>> the
>>>>>>>>> Python worker; the Python worker deserializes the input row
>> and
>>>>>>> evaluates
>>>>>>>>> the Python UDF with it; the result row is serialized and sent
>>> back
>>>>> to
>>>>>>> the
>>>>>>>>> Java operator.
>>>>>>>>>
>>>>>>>>> It suffers from the following problems:
>>>>>>>>> 1) High serialization/deserialization overhead
>>>>>>>>> 2) It’s difficult to leverage the popular Python libraries
>> used
>>> by
>>>>>> data
>>>>>>>>> scientists, such as Pandas, Numpy, etc which provide high
>>>>> performance
>>>>>>> data
>>>>>>>>> structure and functions.
>>>>>>>>>
>>>>>>>>> Jincheng and I have discussed offline and we want to introduce
>>>>>>> vectorized
>>>>>>>>> Python UDF to address the above problems. This feature has
>> also
>>>> been
>>>>>>>>> mentioned in the discussion thread about the Python API
>> plan[2].
>>>> For
>>>>>>>>> vectorized Python UDF, a batch of rows are transferred between
>>> JVM
>>>>> and
>>>>>>>>> Python VM in columnar format. The batch of rows will be
>>> converted
>>>>> to a
>>>>>>>>> collection of Pandas.Series and given to the vectorized Python
>>> UDF
>>>>>> which
>>>>>>>>> could then leverage the popular Python libraries such as
>> Pandas,
>>>>>> Numpy,
>>>>>>> etc
>>>>>>>>> for the Python UDF implementation.
>>>>>>>>>
>>>>>>>>> Please refer the design doc[3] for more details and welcome
>> any
>>>>>>> feedback.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dian
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
>>>>>>>>> [3]
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>
>
> --
> Best, Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

Dian Fu-2
Hi all,

Thanks you all participating this discussion and sharing your thoughts. It
seems that we have reached consensus on the design now. I will start a VOTE
thread if there are no other feedbacks.

Thanks,
Dian

On Tue, Feb 11, 2020 at 10:23 AM Dian Fu <[hidden email]> wrote:

> Hi Jingsong,
>
> You're right. I have updated the FLIP which reflects this.
>
> Thanks,
> Dian
>
> > 在 2020年2月11日,上午10:03,Jingsong Li <[hidden email]> 写道:
> >
> > Hi Dian and Jincheng,
> >
> > Thanks for your explanation. Think again. Maybe most of users don't want
> to
> > modify this parameters.
> > We all realize that "batch.size" should be a larger value, so
> "bundle.size"
> > must also be increased. Now the default value of "bundle.size" is only
> 1000.
> > I think you can update design to provide meaningful default value for
> > "batch.size" and "bundle.size".
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Feb 10, 2020 at 4:36 PM Dian Fu <[hidden email]> wrote:
> >
> >> Hi Jincheng, Hequn & Jingsong,
> >>
> >> Thanks a lot for your suggestions. I have created FLIP-97[1] for this
> >> feature.
> >>
> >>> One little suggestion: maybe it would be nice if we can add some
> >> performance explanation in the document? (I just very curious:))
> >> Thanks for the suggestion. I have updated the design doc in the
> >> "BackGround" section about where the performance gains could be got
> from.
> >>
> >>> It seems that a batch should always in a bundle. Bundle size should
> >> always
> >> bigger than batch size. (if a batch can not cross bundle).
> >> Can you explain this relationship to the document?
> >> I have updated the design doc explaining more about these two
> >> configurations.
> >>
> >>> In the batch world, vectorization batch size is about 1024+. What do
> you
> >> think about the default value of "batch"?
> >> Is there any link about where this value comes from? I have performed a
> >> simple test for Pandas UDF which performs the simple +1 operation. The
> >> performance is best when the batch size is set to 5000. I think it
> depends
> >> on the data type of each column, the functionality the Pandas UDF does,
> >> etc. However I agree with you that we could give a meaningful default
> value
> >> for the "batch" size which works in most scenarios.
> >>
> >>> Can we only configure one parameter and calculate another
> automatically?
> >> For example, if we just want to "pipeline", "bundle.size" is twice as
> much
> >> as "batch.size", is this work?
> >> I agree with Jincheng that this is not feasible. I think that giving an
> >> meaningful default value for the "batch.size" which works in most
> scenarios
> >> is enough. What's your thought?
> >>
> >> Thanks,
> >> Dian
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink
> >>
> >>
> >> On Mon, Feb 10, 2020 at 4:25 PM jincheng sun <[hidden email]>
> >> wrote:
> >>
> >>> Hi Jingsong,
> >>>
> >>> Thanks for your feedback! I would like to share my thoughts regarding
> the
> >>> follows question:
> >>>
> >>>>> - Can we only configure one parameter and calculate another
> >>> automatically? For example, if we just want to "pipeline",
> "bundle.size"
> >> is
> >>> twice as much as "batch.size", is this work?
> >>>
> >>> I don't think this works. These two configurations are used for
> different
> >>> purposes and there is no direct relationship between them and so I
> guess
> >> we
> >>> cannot infer a configuration from the other configuration.
> >>>
> >>> Best,
> >>> Jincheng
> >>>
> >>>
> >>> Jingsong Li <[hidden email]> 于2020年2月10日周一 下午1:53写道:
> >>>
> >>>> Thanks Dian for your reply.
> >>>>
> >>>> +1 to create a FLIP too.
> >>>>
> >>>> About "python.fn-execution.bundle.size" and
> >>>> "python.fn-execution.arrow.batch.size", I got what are you mean about
> >>>> "pipeline". I agree.
> >>>> It seems that a batch should always in a bundle. Bundle size should
> >>> always
> >>>> bigger than batch size. (if a batch can not cross bundle).
> >>>> Can you explain this relationship to the document?
> >>>>
> >>>> I think default value is a very important thing, we can discuss:
> >>>> - In the batch world, vectorization batch size is about 1024+. What do
> >>> you
> >>>> think about the default value of "batch"?
> >>>> - Can we only configure one parameter and calculate another
> >>> automatically?
> >>>> For example, if we just want to "pipeline", "bundle.size" is twice as
> >>> much
> >>>> as "batch.size", is this work?
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Mon, Feb 10, 2020 at 11:55 AM Hequn Cheng <[hidden email]>
> wrote:
> >>>>
> >>>>> Hi Dian,
> >>>>>
> >>>>> Thanks a lot for bringing up the discussion!
> >>>>>
> >>>>> It is great to see the Pandas UDFs feature is going to be
> >> introduced. I
> >>>>> think this would improve the performance and also the usability of
> >>>>> user-defined functions (UDFs) in Python.
> >>>>> One little suggestion: maybe it would be nice if we can add some
> >>>>> performance explanation in the document? (I just very curious:))
> >>>>>
> >>>>> +1 to create a FLIP for this big enhancement.
> >>>>>
> >>>>> Best,
> >>>>> Hequn
> >>>>>
> >>>>> On Mon, Feb 10, 2020 at 11:15 AM jincheng sun <
> >>> [hidden email]>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Dian,
> >>>>>>
> >>>>>> Thanks for bring up this discussion. This is very important for the
> >>>>>> ecological of PyFlink. Add support Pandas greatly enriches the
> >>>> available
> >>>>>> UDF library of PyFlink and greatly improves the usability of
> >> PyFlink!
> >>>>>>
> >>>>>> +1 for Support scalar vectorized Python UDF.
> >>>>>>
> >>>>>> I think we should to create a FLIP for this big enhancements. :)
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Jincheng
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> dianfu <[hidden email]> 于2020年2月5日周三 下午6:01写道:
> >>>>>>
> >>>>>>> Hi Jingsong,
> >>>>>>>
> >>>>>>> Thanks a lot for the valuable feedback.
> >>>>>>>
> >>>>>>> 1. The configurations "python.fn-execution.bundle.size" and
> >>>>>>> "python.fn-execution.arrow.batch.size" are used for separate
> >>> purposes
> >>>>>> and I
> >>>>>>> think they are both needed. If they are unified, the Python
> >>> operator
> >>>>> has
> >>>>>> to
> >>>>>>> wait the execution results of the previous batch of elements
> >> before
> >>>>>>> processing the next batch. This means that the Python UDF
> >> execution
> >>>> can
> >>>>>> not
> >>>>>>> be pipelined between batches. With separate configuration, there
> >>> will
> >>>>> be
> >>>>>> no
> >>>>>>> such problems.
> >>>>>>> 2. It means that the Java operator will convert input elements to
> >>>> Arrow
> >>>>>>> memory format and then send them to the Python worker, vice
> >> verse.
> >>>>>>> Regarding to the zero-copy benefits provided by Arrow, we can
> >> gain
> >>>> them
> >>>>>>> automatically using Arrow.
> >>>>>>> 3. Good point! As all the classes of Python module is written in
> >>> Java
> >>>>> and
> >>>>>>> it's not suggested to introduce new Scala classes, so I guess
> >> it's
> >>>> not
> >>>>>> easy
> >>>>>>> to do so right now. But I think this is definitely a good
> >>> improvement
> >>>>> we
> >>>>>>> can do in the future.
> >>>>>>> 4. You're right and we will add a series of Arrow ColumnVectors
> >> for
> >>>>> each
> >>>>>>> type supported.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Dian
> >>>>>>>
> >>>>>>>> 在 2020年2月5日,下午4:57,Jingsong Li <[hidden email]> 写道:
> >>>>>>>>
> >>>>>>>> Hi Dian,
> >>>>>>>>
> >>>>>>>> +1 for this, thanks driving.
> >>>>>>>> Documentation looks very good. I can imagine a huge performance
> >>>>>>> improvement
> >>>>>>>> and better integration to other Python libraries.
> >>>>>>>>
> >>>>>>>> A few thoughts:
> >>>>>>>> - About data split: "python.fn-execution.arrow.batch.size", can
> >>> we
> >>>>>> unify
> >>>>>>> it
> >>>>>>>> with "python.fn-execution.bundle.size"?
> >>>>>>>> - Use of Apache Arrow as the exchange format: Do you mean Arrow
> >>>>> support
> >>>>>>>> zero-copy between Java and Python?
> >>>>>>>> - ArrowFieldWriter seems we can implement it by code
> >> generation.
> >>>> But
> >>>>> it
> >>>>>>> is
> >>>>>>>> OK to initial version with virtual function call.
> >>>>>>>> - ColumnarRow for vectorization reading seems that we need
> >>>> implement
> >>>>>>>> ArrowColumnVectors.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Jingsong Lee
> >>>>>>>>
> >>>>>>>> On Wed, Feb 5, 2020 at 12:45 PM dianfu <[hidden email]>
> >>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> Scalar Python UDF has already been supported in the coming
> >>> release
> >>>>>> 1.10
> >>>>>>>>> (FLIP-58[1]). It operates one row at a time. It works in the
> >> way
> >>>>> that
> >>>>>>> the
> >>>>>>>>> Java operator serializes one input row to bytes and sends them
> >>> to
> >>>>> the
> >>>>>>>>> Python worker; the Python worker deserializes the input row
> >> and
> >>>>>>> evaluates
> >>>>>>>>> the Python UDF with it; the result row is serialized and sent
> >>> back
> >>>>> to
> >>>>>>> the
> >>>>>>>>> Java operator.
> >>>>>>>>>
> >>>>>>>>> It suffers from the following problems:
> >>>>>>>>> 1) High serialization/deserialization overhead
> >>>>>>>>> 2) It’s difficult to leverage the popular Python libraries
> >> used
> >>> by
> >>>>>> data
> >>>>>>>>> scientists, such as Pandas, Numpy, etc which provide high
> >>>>> performance
> >>>>>>> data
> >>>>>>>>> structure and functions.
> >>>>>>>>>
> >>>>>>>>> Jincheng and I have discussed offline and we want to introduce
> >>>>>>> vectorized
> >>>>>>>>> Python UDF to address the above problems. This feature has
> >> also
> >>>> been
> >>>>>>>>> mentioned in the discussion thread about the Python API
> >> plan[2].
> >>>> For
> >>>>>>>>> vectorized Python UDF, a batch of rows are transferred between
> >>> JVM
> >>>>> and
> >>>>>>>>> Python VM in columnar format. The batch of rows will be
> >>> converted
> >>>>> to a
> >>>>>>>>> collection of Pandas.Series and given to the vectorized Python
> >>> UDF
> >>>>>> which
> >>>>>>>>> could then leverage the popular Python libraries such as
> >> Pandas,
> >>>>>> Numpy,
> >>>>>>> etc
> >>>>>>>>> for the Python UDF implementation.
> >>>>>>>>>
> >>>>>>>>> Please refer the design doc[3] for more details and welcome
> >> any
> >>>>>>> feedback.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Dian
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> >>>>>>>>> [3]
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Best, Jingsong Lee
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>