[DISCUSS] Flink Python User-Defined Function for Table API

classic Classic list List threaded Threaded
31 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Timo Walther-2
Hi Jincheng,

2. Serializability of functions: "#2 is very convenient for users" means
only until they have the first backwards-compatibility issue, after that
they will find it not so convinient anymore and will ask why the
framework allowed storing such objects in a persistent storage. I don't
want to be picky about it, but wanted to raise awareness that sometimes
it is ok to limit use cases to guide users for devloping
backwards-compatible programs.

Thanks for the explanation fo the remaining items. It sounds reasonable
to me. Regarding the example with `getKind()`, I actually meant
`org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
users to override this property. And I think we should do something
similar for the getLanguage property.

Thanks,
Timo

On 03.09.19 15:01, jincheng sun wrote:

> Hi Timo,
>
> Thanks for the quick reply ! :)
> I have added more example for #3 and #5 to the FLIP. That are great
> suggestions !
>
> Regarding 2:
>
> There are two kind Serialization for CloudPickle(Which is different from
> Java):
>   1) For class and function which can be imported, CloudPickle only
> serialize the full path of the class and function (just like java class
> name).
>   2) For the class and function which can not be imported, CloudPickle will
> serialize the full content of the class and function.
> For #2, It means that we can not just store the full path of the class and
> function.
>
> The above serialization is recursive.
>
> However, there is indeed an problem of backwards compatibility when the
> module path of the parent class changed. But I think this is an rare case
> and acceptable. i.e., For Flink framework we never change the user
> interface module path if we want to keep backwards compatibility. For user
> code, if they change the interface of UDF's parent, they should re-register
> their functions.
>
> If we do not want support #2, we can store the full path of class and
> function, in that case we have no backwards compatibility problem. But I
> think the #2 is very convenient for users.
>
> What do you think?
>
> Regarding 4:
> As I mentioned earlier, there may be built-in Python functions and I think
> language is a "function" concept. Function and Language are orthogonal
> concepts.
> We may have R, GO and other language functions in the future, not only
> user-defined, but also built-in functions.
>
> You are right that users will not set this method and for Python functions,
> it will be set in the code-generated Java function by the framework. So, I
> think we should declare the getLanguage() in FunctionDefinition for now.
> (I'm not pretty sure what do you mean by saying that getKind() is final in
> UserDefinedFunction?)
>
> Best,
> Jincheng
>
> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>
>> Hi Jincheng,
>>
>> thanks for your response.
>>
>> 2. Serializability of functions: Using some arbitrary serialization
>> format for shipping a function to worker sounds fine to me. But once we
>> store functions a the catalog we need to think about backwards
>> compatibility and evolution of interfaces etc. I'm not sure if
>> CloudPickle is the right long-term storage format for this. If we don't
>> think about this in advance, we are basically violating our code quality
>> guide [1] of never use Java Serialization but in the Python-way. We are
>> using the RPC serialization for persistence.
>>
>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
>> code like the following is not covered there:
>>
>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>> DataTypes.BIGINT(),
>>                                               DataTypes.BIGINT()))
>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>> DataTypes.BIGINT(),
>> DataTypes.BIGINT()))
>> self.t_env.register_function("add", add)
>>
>> 4. FunctionDefinition: Your response still doesn't answer my question
>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
>> "user-defined function" concept and not a "function" concept. In any
>> case, all users should not be able to set this method. So it must be
>> final in UserDefinedFunction similar to getKind().
>>
>> 5. Function characteristics: If UserDefinedFunction is defined in
>> Python, why is it not used in your example in FLIP-58. You could you
>> extend the example to show how to specify these attributes in the FLIP?
>>
>> Regards,
>> Timo
>>
>> [1] https://flink.apache.org/contributing/code-style-and-quality-java.html
>>
>> On 02.09.19 15:35, jincheng sun wrote:
>>> Hi Timo,
>>>
>>> Great thanks for your feedback. I would like to share my thoughts with
>> you
>>> inline. :)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>
>>>> Hi all,
>>>>
>>>> the FLIP looks awesome. However, I would like to discuss the changes to
>>>> the user-facing parts again. Some feedback:
>>>>
>>>> 1. DataViews: With the current non-annotation design for DataViews, we
>>>> cannot perform eager state declaration, right? At which point during
>>>> execution do we know which state is required by the function? We need to
>>>> instantiate the function first, right?
>>>>
>>>>> We will analysis the Python AggregateFunction and extract the DataViews
>>> used in the Python AggregateFunction. This can be done
>>> by instantiate a Python AggregateFunction, creating an accumulator by
>>> calling method create_accumulator and then analysis the created
>>> accumulator. This is actually similar to the way that Java
>>> AggregateFunction processing codegen logic. The extracted DataViews can
>>> then be used to construct the StateDescriptors in the operator, i.e., we
>>> should have hold the state spec and the state descriptor id in Java
>>> operator and Python worker can access the state by specifying the
>>> corresponding state descriptor id.
>>>
>>>
>>>
>>>> 2. Serializability of functions: How do we ensure serializability of
>>>> functions for catalog persistence? In the Scala/Java API, we would like
>>>> to register classes instead of instances soon. This is the only way to
>>>> store a function properly in a catalog or we need some
>>>> serialization/deserialization logic in the function interfaces to
>>>> convert an instance to string properties.
>>>>
>>>>> The Python function will be serialized with CloudPickle anyway in the
>>> Python API as we need to transfer it to the Python worker which can then
>>> deserialize it for execution. The serialized Python function can be
>> stored
>>> into catalog.
>>>
>>>
>>>
>>>> 3. TableEnvironment: What is the signature of `register_function(self,
>>>> name, function)`? Does it accept both a class and function? Like `class
>>>> Sum` and `def split()`? Could you add some examples for registering both
>>>> kinds of functions?
>>>>
>>>>> It has been already supported which you mentioned. You can find an
>>> example in the POC code:
>>>
>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>
>>>
>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>> function definition. It is the highest interface for both user-defined
>>>> and built-in functions. I'm not sure if getLanguage() should be part of
>>>> this interface or one-level down which would be `UserDefinedFunction`.
>>>> Built-in functions will never be implemented in a different language. In
>>>> any case, I would vote for removing the UNKNOWN language, because it
>>>> does not solve anything. Why should a user declare a function that the
>>>> runtime can not handle? I also find the term `JAVA` confusing for Scala
>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>
>>>>> Actually we may have built-in Python functions in the future. Regarding
>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
>>> built-in Python
>>> funciton for '+' operator, then we don't need to mix using Java and
>> Python
>>> UDFs. In this way, we can improve the execution performance.
>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to me.
>>>
>>>
>>>
>>>> 5. Function characteristics: In the current design, function classes do
>>>> not extend from any upper class. How can users declare characteristics
>>>> that are present in `FunctionDefinition` like determinism, requirements,
>>>> or soon also monotonism.
>>>>
>>>>> Actually we have defined 'UserDefinedFunction' which is the base class
>>> for all user-defined functions.
>>> We can define the deterministic, requirements, etc in this class.
>>> Currently, we have already supported to define the deterministic.
>>>
>>>
>>>
>>>> Thanks,
>>>> Timo
>>>>
>>>>
>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>> I am assuming the proposed python UDX can also be applied to Flink SQL.
>>>>> Is this correct? If yes, I would suggest to title the FLIP as "Flink
>>>> Python
>>>>> User-Defined Function" or "Flink Python User-Defined Function for
>> Table".
>>>>> Regards,
>>>>> Shaoxuan
>>>>>
>>>>>
>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the feedback Bowen!
>>>>>>
>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>
>>>>>> Best, Jincheng
>>>>>>
>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I have started a voting thread [1]. Thanks a lot for your help during
>>>>>>> creating the FLIP @Jincheng.
>>>>>>>
>>>>>>>
>>>>>>> Hi Bowen,
>>>>>>>
>>>>>>> Very appreciated for your comments. I have replied you in the design
>>>> doc.
>>>>>>> As it seems that the comments doesn't affect the overall design, I'll
>>>> not
>>>>>>> cancel the vote for now and we can continue the discussion in the
>>>> design
>>>>>>> doc.
>>>>>>>
>>>>>>> [1]
>>>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>> <
>>>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>> Regards,
>>>>>>> Dian
>>>>>>>
>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>
>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>
>>>>>>>> Sorry for being late to the party. I took a glance at the proposal,
>>>>>> LGTM
>>>>>>> in
>>>>>>>> general, and I left only a couple comments.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Bowen
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]>
>>>> wrote:
>>>>>>>>> Hi Jincheng,
>>>>>>>>>
>>>>>>>>> Thanks! It works.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Dian
>>>>>>>>>
>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>
>>>>>>>>>>> Appreciated for the kind tips and offering of help. Definitely
>> need
>>>>>>> it!
>>>>>>>>>>> Could you grant me write permission for confluence? My Id: Dian
>> Fu
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dian
>>>>>>>>>>>
>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>
>>>>>>>>>>>> Dian, I am glad to see that you want help to create the FLIP!
>>>>>>>>>>>> Everyone will have first time, and I am very willing to help you
>>>>>>>>> complete
>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>
>>>>>>>>>>>> - First I'll give your account write permission for confluence.
>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP Template
>>>>>> [1],
>>>>>>>>>>> (It's
>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing the
>> VOTE
>>>>>> of
>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
>> want!
>>>> )
>>>>>>>>>>>> Any problems you encounter during this period,feel free to tell
>> me
>>>>>>> that
>>>>>>>>>>> we
>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>> [2]
>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五 上午11:54写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
>>>> willing
>>>>>>> to
>>>>>>>>>>> help
>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't created a
>>>> FLIP
>>>>>>>>>>> before,
>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <[hidden email]>
>>>>>> 写道:
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>> suggestions
>>>>>>> and
>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to create a
>>>>>> FLIP
>>>>>>>>> for
>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important reminder
>>>>>>> about
>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have had add the description about how to perform bundle
>>>>>>>>> processing
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free to
>>>> leave
>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三 上午10:08写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>> "Checkpoint"[1]
>>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> design doc which talks about how to handle the checkpoint.
>>>>>>>>>>>>>>>>> However, I think you are right that we should talk more
>> about
>>>>>>> it,
>>>>>>>>>>>>> such
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the checkpoint and
>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is very
>>>>>>> detailed,
>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
>>>>>>> understand
>>>>>>>>> :)
>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
>> bundle
>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
>>>>>> elements
>>>>>>>>> are
>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
>> Flink
>>>>>>>>> runner
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for streaming,
>> you
>>>>>>> can
>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>> watermarks
>>>>>>>>> and
>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>
>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
>>>> already
>>>>>>>>> been
>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users, we'd
>> like
>>>>>> to
>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the Python
>> Table
>>>>>>> API.
>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed offline
>> and
>>>>>>> have
>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
>>>>>>> thread[2],
>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam in
>>>>>> latest
>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data structures
>> and
>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution. This
>>>>>> design
>>>>>>> is
>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how to
>> make
>>>>>>> use
>>>>>>>>> of
>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>> execution:
>>>>>>> data
>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics, logging,
>>>>>> etc.
>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's portability
>>>>>>>>> framework
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all the
>>>>>>>>>>> contributors
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
>>>>>>> framework,
>>>>>>>>> we
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also ease of
>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Aljoscha Krettek-2
Hi,

Things looks interesting so far!

I had one question: Where will most of the support code for this live? Will this add the required code to flink-table-common or the different runners? Can we implement this in such a way that only a minimal amount of support code is required in the parts of the Table API (and Table API runners) that  are not python specific?

Best,
Aljoscha

> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>
> Hi Jincheng,
>
> 2. Serializability of functions: "#2 is very convenient for users" means only until they have the first backwards-compatibility issue, after that they will find it not so convinient anymore and will ask why the framework allowed storing such objects in a persistent storage. I don't want to be picky about it, but wanted to raise awareness that sometimes it is ok to limit use cases to guide users for devloping backwards-compatible programs.
>
> Thanks for the explanation fo the remaining items. It sounds reasonable to me. Regarding the example with `getKind()`, I actually meant `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow users to override this property. And I think we should do something similar for the getLanguage property.
>
> Thanks,
> Timo
>
> On 03.09.19 15:01, jincheng sun wrote:
>> Hi Timo,
>>
>> Thanks for the quick reply ! :)
>> I have added more example for #3 and #5 to the FLIP. That are great
>> suggestions !
>>
>> Regarding 2:
>>
>> There are two kind Serialization for CloudPickle(Which is different from
>> Java):
>>  1) For class and function which can be imported, CloudPickle only
>> serialize the full path of the class and function (just like java class
>> name).
>>  2) For the class and function which can not be imported, CloudPickle will
>> serialize the full content of the class and function.
>> For #2, It means that we can not just store the full path of the class and
>> function.
>>
>> The above serialization is recursive.
>>
>> However, there is indeed an problem of backwards compatibility when the
>> module path of the parent class changed. But I think this is an rare case
>> and acceptable. i.e., For Flink framework we never change the user
>> interface module path if we want to keep backwards compatibility. For user
>> code, if they change the interface of UDF's parent, they should re-register
>> their functions.
>>
>> If we do not want support #2, we can store the full path of class and
>> function, in that case we have no backwards compatibility problem. But I
>> think the #2 is very convenient for users.
>>
>> What do you think?
>>
>> Regarding 4:
>> As I mentioned earlier, there may be built-in Python functions and I think
>> language is a "function" concept. Function and Language are orthogonal
>> concepts.
>> We may have R, GO and other language functions in the future, not only
>> user-defined, but also built-in functions.
>>
>> You are right that users will not set this method and for Python functions,
>> it will be set in the code-generated Java function by the framework. So, I
>> think we should declare the getLanguage() in FunctionDefinition for now.
>> (I'm not pretty sure what do you mean by saying that getKind() is final in
>> UserDefinedFunction?)
>>
>> Best,
>> Jincheng
>>
>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>
>>> Hi Jincheng,
>>>
>>> thanks for your response.
>>>
>>> 2. Serializability of functions: Using some arbitrary serialization
>>> format for shipping a function to worker sounds fine to me. But once we
>>> store functions a the catalog we need to think about backwards
>>> compatibility and evolution of interfaces etc. I'm not sure if
>>> CloudPickle is the right long-term storage format for this. If we don't
>>> think about this in advance, we are basically violating our code quality
>>> guide [1] of never use Java Serialization but in the Python-way. We are
>>> using the RPC serialization for persistence.
>>>
>>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
>>> code like the following is not covered there:
>>>
>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>> DataTypes.BIGINT(),
>>>                                              DataTypes.BIGINT()))
>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>> DataTypes.BIGINT(),
>>> DataTypes.BIGINT()))
>>> self.t_env.register_function("add", add)
>>>
>>> 4. FunctionDefinition: Your response still doesn't answer my question
>>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
>>> "user-defined function" concept and not a "function" concept. In any
>>> case, all users should not be able to set this method. So it must be
>>> final in UserDefinedFunction similar to getKind().
>>>
>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>> Python, why is it not used in your example in FLIP-58. You could you
>>> extend the example to show how to specify these attributes in the FLIP?
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>
>>> On 02.09.19 15:35, jincheng sun wrote:
>>>> Hi Timo,
>>>>
>>>> Great thanks for your feedback. I would like to share my thoughts with
>>> you
>>>> inline. :)
>>>>
>>>> Best,
>>>> Jincheng
>>>>
>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>
>>>>> Hi all,
>>>>>
>>>>> the FLIP looks awesome. However, I would like to discuss the changes to
>>>>> the user-facing parts again. Some feedback:
>>>>>
>>>>> 1. DataViews: With the current non-annotation design for DataViews, we
>>>>> cannot perform eager state declaration, right? At which point during
>>>>> execution do we know which state is required by the function? We need to
>>>>> instantiate the function first, right?
>>>>>
>>>>>> We will analysis the Python AggregateFunction and extract the DataViews
>>>> used in the Python AggregateFunction. This can be done
>>>> by instantiate a Python AggregateFunction, creating an accumulator by
>>>> calling method create_accumulator and then analysis the created
>>>> accumulator. This is actually similar to the way that Java
>>>> AggregateFunction processing codegen logic. The extracted DataViews can
>>>> then be used to construct the StateDescriptors in the operator, i.e., we
>>>> should have hold the state spec and the state descriptor id in Java
>>>> operator and Python worker can access the state by specifying the
>>>> corresponding state descriptor id.
>>>>
>>>>
>>>>
>>>>> 2. Serializability of functions: How do we ensure serializability of
>>>>> functions for catalog persistence? In the Scala/Java API, we would like
>>>>> to register classes instead of instances soon. This is the only way to
>>>>> store a function properly in a catalog or we need some
>>>>> serialization/deserialization logic in the function interfaces to
>>>>> convert an instance to string properties.
>>>>>
>>>>>> The Python function will be serialized with CloudPickle anyway in the
>>>> Python API as we need to transfer it to the Python worker which can then
>>>> deserialize it for execution. The serialized Python function can be
>>> stored
>>>> into catalog.
>>>>
>>>>
>>>>
>>>>> 3. TableEnvironment: What is the signature of `register_function(self,
>>>>> name, function)`? Does it accept both a class and function? Like `class
>>>>> Sum` and `def split()`? Could you add some examples for registering both
>>>>> kinds of functions?
>>>>>
>>>>>> It has been already supported which you mentioned. You can find an
>>>> example in the POC code:
>>>>
>>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>
>>>>
>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>> function definition. It is the highest interface for both user-defined
>>>>> and built-in functions. I'm not sure if getLanguage() should be part of
>>>>> this interface or one-level down which would be `UserDefinedFunction`.
>>>>> Built-in functions will never be implemented in a different language. In
>>>>> any case, I would vote for removing the UNKNOWN language, because it
>>>>> does not solve anything. Why should a user declare a function that the
>>>>> runtime can not handle? I also find the term `JAVA` confusing for Scala
>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>
>>>>>> Actually we may have built-in Python functions in the future. Regarding
>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
>>>> built-in Python
>>>> funciton for '+' operator, then we don't need to mix using Java and
>>> Python
>>>> UDFs. In this way, we can improve the execution performance.
>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to me.
>>>>
>>>>
>>>>
>>>>> 5. Function characteristics: In the current design, function classes do
>>>>> not extend from any upper class. How can users declare characteristics
>>>>> that are present in `FunctionDefinition` like determinism, requirements,
>>>>> or soon also monotonism.
>>>>>
>>>>>> Actually we have defined 'UserDefinedFunction' which is the base class
>>>> for all user-defined functions.
>>>> We can define the deterministic, requirements, etc in this class.
>>>> Currently, we have already supported to define the deterministic.
>>>>
>>>>
>>>>
>>>>> Thanks,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>> I am assuming the proposed python UDX can also be applied to Flink SQL.
>>>>>> Is this correct? If yes, I would suggest to title the FLIP as "Flink
>>>>> Python
>>>>>> User-Defined Function" or "Flink Python User-Defined Function for
>>> Table".
>>>>>> Regards,
>>>>>> Shaoxuan
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>> [hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the feedback Bowen!
>>>>>>>
>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>
>>>>>>> Best, Jincheng
>>>>>>>
>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help during
>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi Bowen,
>>>>>>>>
>>>>>>>> Very appreciated for your comments. I have replied you in the design
>>>>> doc.
>>>>>>>> As it seems that the comments doesn't affect the overall design, I'll
>>>>> not
>>>>>>>> cancel the vote for now and we can continue the discussion in the
>>>>> design
>>>>>>>> doc.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>> <
>>>>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>> Regards,
>>>>>>>> Dian
>>>>>>>>
>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>
>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>
>>>>>>>>> Sorry for being late to the party. I took a glance at the proposal,
>>>>>>> LGTM
>>>>>>>> in
>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Bowen
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]>
>>>>> wrote:
>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>
>>>>>>>>>> Thanks! It works.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Dian
>>>>>>>>>>
>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>
>>>>>>>>>>>> Appreciated for the kind tips and offering of help. Definitely
>>> need
>>>>>>>> it!
>>>>>>>>>>>> Could you grant me write permission for confluence? My Id: Dian
>>> Fu
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the FLIP!
>>>>>>>>>>>>> Everyone will have first time, and I am very willing to help you
>>>>>>>>>> complete
>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - First I'll give your account write permission for confluence.
>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP Template
>>>>>>> [1],
>>>>>>>>>>>> (It's
>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing the
>>> VOTE
>>>>>>> of
>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
>>> want!
>>>>> )
>>>>>>>>>>>>> Any problems you encounter during this period,feel free to tell
>>> me
>>>>>>>> that
>>>>>>>>>>>> we
>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五 上午11:54写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
>>>>> willing
>>>>>>>> to
>>>>>>>>>>>> help
>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't created a
>>>>> FLIP
>>>>>>>>>>>> before,
>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <[hidden email]>
>>>>>>> 写道:
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>> suggestions
>>>>>>>> and
>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to create a
>>>>>>> FLIP
>>>>>>>>>> for
>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important reminder
>>>>>>>> about
>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have had add the description about how to perform bundle
>>>>>>>>>> processing
>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free to
>>>>> leave
>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三 上午10:08写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the checkpoint.
>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk more
>>> about
>>>>>>>> it,
>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the checkpoint and
>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is very
>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
>>>>>>>> understand
>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
>>> bundle
>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
>>>>>>> elements
>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
>>> Flink
>>>>>>>>>> runner
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for streaming,
>>> you
>>>>>>>> can
>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>> watermarks
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>
>>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
>>>>> already
>>>>>>>>>> been
>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users, we'd
>>> like
>>>>>>> to
>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the Python
>>> Table
>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed offline
>>> and
>>>>>>>> have
>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
>>>>>>>> thread[2],
>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam in
>>>>>>> latest
>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data structures
>>> and
>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution. This
>>>>>>> design
>>>>>>>> is
>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how to
>>> make
>>>>>>>> use
>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>> execution:
>>>>>>>> data
>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics, logging,
>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's portability
>>>>>>>>>> framework
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all the
>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
>>>>>>>> framework,
>>>>>>>>>> we
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also ease of
>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

jincheng sun
Hi Timo,

Yes, I think about convenient is a trade-off, for now think convenient
compare with always do right thing I prefer add some limitation ensure user
never encounter issues. So, #2 we are on the same page now.

Best,
Jincheng

Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:

> Hi,
>
> Things looks interesting so far!
>
> I had one question: Where will most of the support code for this live?
> Will this add the required code to flink-table-common or the different
> runners? Can we implement this in such a way that only a minimal amount of
> support code is required in the parts of the Table API (and Table API
> runners) that  are not python specific?
>
> Best,
> Aljoscha
>
> > On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
> >
> > Hi Jincheng,
> >
> > 2. Serializability of functions: "#2 is very convenient for users" means
> only until they have the first backwards-compatibility issue, after that
> they will find it not so convinient anymore and will ask why the framework
> allowed storing such objects in a persistent storage. I don't want to be
> picky about it, but wanted to raise awareness that sometimes it is ok to
> limit use cases to guide users for devloping backwards-compatible programs.
> >
> > Thanks for the explanation fo the remaining items. It sounds reasonable
> to me. Regarding the example with `getKind()`, I actually meant
> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
> users to override this property. And I think we should do something similar
> for the getLanguage property.
> >
> > Thanks,
> > Timo
> >
> > On 03.09.19 15:01, jincheng sun wrote:
> >> Hi Timo,
> >>
> >> Thanks for the quick reply ! :)
> >> I have added more example for #3 and #5 to the FLIP. That are great
> >> suggestions !
> >>
> >> Regarding 2:
> >>
> >> There are two kind Serialization for CloudPickle(Which is different from
> >> Java):
> >>  1) For class and function which can be imported, CloudPickle only
> >> serialize the full path of the class and function (just like java class
> >> name).
> >>  2) For the class and function which can not be imported, CloudPickle
> will
> >> serialize the full content of the class and function.
> >> For #2, It means that we can not just store the full path of the class
> and
> >> function.
> >>
> >> The above serialization is recursive.
> >>
> >> However, there is indeed an problem of backwards compatibility when the
> >> module path of the parent class changed. But I think this is an rare
> case
> >> and acceptable. i.e., For Flink framework we never change the user
> >> interface module path if we want to keep backwards compatibility. For
> user
> >> code, if they change the interface of UDF's parent, they should
> re-register
> >> their functions.
> >>
> >> If we do not want support #2, we can store the full path of class and
> >> function, in that case we have no backwards compatibility problem. But I
> >> think the #2 is very convenient for users.
> >>
> >> What do you think?
> >>
> >> Regarding 4:
> >> As I mentioned earlier, there may be built-in Python functions and I
> think
> >> language is a "function" concept. Function and Language are orthogonal
> >> concepts.
> >> We may have R, GO and other language functions in the future, not only
> >> user-defined, but also built-in functions.
> >>
> >> You are right that users will not set this method and for Python
> functions,
> >> it will be set in the code-generated Java function by the framework.
> So, I
> >> think we should declare the getLanguage() in FunctionDefinition for now.
> >> (I'm not pretty sure what do you mean by saying that getKind() is final
> in
> >> UserDefinedFunction?)
> >>
> >> Best,
> >> Jincheng
> >>
> >> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> thanks for your response.
> >>>
> >>> 2. Serializability of functions: Using some arbitrary serialization
> >>> format for shipping a function to worker sounds fine to me. But once we
> >>> store functions a the catalog we need to think about backwards
> >>> compatibility and evolution of interfaces etc. I'm not sure if
> >>> CloudPickle is the right long-term storage format for this. If we don't
> >>> think about this in advance, we are basically violating our code
> quality
> >>> guide [1] of never use Java Serialization but in the Python-way. We are
> >>> using the RPC serialization for persistence.
> >>>
> >>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
> >>> code like the following is not covered there:
> >>>
> >>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>> DataTypes.BIGINT(),
> >>>                                              DataTypes.BIGINT()))
> >>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> >>> DataTypes.BIGINT(),
> >>> DataTypes.BIGINT()))
> >>> self.t_env.register_function("add", add)
> >>>
> >>> 4. FunctionDefinition: Your response still doesn't answer my question
> >>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
> >>> "user-defined function" concept and not a "function" concept. In any
> >>> case, all users should not be able to set this method. So it must be
> >>> final in UserDefinedFunction similar to getKind().
> >>>
> >>> 5. Function characteristics: If UserDefinedFunction is defined in
> >>> Python, why is it not used in your example in FLIP-58. You could you
> >>> extend the example to show how to specify these attributes in the FLIP?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> [1]
> https://flink.apache.org/contributing/code-style-and-quality-java.html
> >>>
> >>> On 02.09.19 15:35, jincheng sun wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Great thanks for your feedback. I would like to share my thoughts with
> >>> you
> >>>> inline. :)
> >>>>
> >>>> Best,
> >>>> Jincheng
> >>>>
> >>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> the FLIP looks awesome. However, I would like to discuss the changes
> to
> >>>>> the user-facing parts again. Some feedback:
> >>>>>
> >>>>> 1. DataViews: With the current non-annotation design for DataViews,
> we
> >>>>> cannot perform eager state declaration, right? At which point during
> >>>>> execution do we know which state is required by the function? We
> need to
> >>>>> instantiate the function first, right?
> >>>>>
> >>>>>> We will analysis the Python AggregateFunction and extract the
> DataViews
> >>>> used in the Python AggregateFunction. This can be done
> >>>> by instantiate a Python AggregateFunction, creating an accumulator by
> >>>> calling method create_accumulator and then analysis the created
> >>>> accumulator. This is actually similar to the way that Java
> >>>> AggregateFunction processing codegen logic. The extracted DataViews
> can
> >>>> then be used to construct the StateDescriptors in the operator, i.e.,
> we
> >>>> should have hold the state spec and the state descriptor id in Java
> >>>> operator and Python worker can access the state by specifying the
> >>>> corresponding state descriptor id.
> >>>>
> >>>>
> >>>>
> >>>>> 2. Serializability of functions: How do we ensure serializability of
> >>>>> functions for catalog persistence? In the Scala/Java API, we would
> like
> >>>>> to register classes instead of instances soon. This is the only way
> to
> >>>>> store a function properly in a catalog or we need some
> >>>>> serialization/deserialization logic in the function interfaces to
> >>>>> convert an instance to string properties.
> >>>>>
> >>>>>> The Python function will be serialized with CloudPickle anyway in
> the
> >>>> Python API as we need to transfer it to the Python worker which can
> then
> >>>> deserialize it for execution. The serialized Python function can be
> >>> stored
> >>>> into catalog.
> >>>>
> >>>>
> >>>>
> >>>>> 3. TableEnvironment: What is the signature of
> `register_function(self,
> >>>>> name, function)`? Does it accept both a class and function? Like
> `class
> >>>>> Sum` and `def split()`? Could you add some examples for registering
> both
> >>>>> kinds of functions?
> >>>>>
> >>>>>> It has been already supported which you mentioned. You can find an
> >>>> example in the POC code:
> >>>>
> >>>
> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
> >>>>
> >>>>
> >>>>> 4. FunctionDefinition: Function definition is not a user-defined
> >>>>> function definition. It is the highest interface for both
> user-defined
> >>>>> and built-in functions. I'm not sure if getLanguage() should be part
> of
> >>>>> this interface or one-level down which would be
> `UserDefinedFunction`.
> >>>>> Built-in functions will never be implemented in a different
> language. In
> >>>>> any case, I would vote for removing the UNKNOWN language, because it
> >>>>> does not solve anything. Why should a user declare a function that
> the
> >>>>> runtime can not handle? I also find the term `JAVA` confusing for
> Scala
> >>>>> users. How about `FunctionLanguage.JVM` instead?
> >>>>>
> >>>>>> Actually we may have built-in Python functions in the future.
> Regarding
> >>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
> >>>> built-in Python
> >>>> funciton for '+' operator, then we don't need to mix using Java and
> >>> Python
> >>>> UDFs. In this way, we can improve the execution performance.
> >>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
> >>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to
> me.
> >>>>
> >>>>
> >>>>
> >>>>> 5. Function characteristics: In the current design, function classes
> do
> >>>>> not extend from any upper class. How can users declare
> characteristics
> >>>>> that are present in `FunctionDefinition` like determinism,
> requirements,
> >>>>> or soon also monotonism.
> >>>>>
> >>>>>> Actually we have defined 'UserDefinedFunction' which is the base
> class
> >>>> for all user-defined functions.
> >>>> We can define the deterministic, requirements, etc in this class.
> >>>> Currently, we have already supported to define the deterministic.
> >>>>
> >>>>
> >>>>
> >>>>> Thanks,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
> >>>>>> Hi Jincheng, Fudian, and Aljoscha,
> >>>>>> I am assuming the proposed python UDX can also be applied to Flink
> SQL.
> >>>>>> Is this correct? If yes, I would suggest to title the FLIP as "Flink
> >>>>> Python
> >>>>>> User-Defined Function" or "Flink Python User-Defined Function for
> >>> Table".
> >>>>>> Regards,
> >>>>>> Shaoxuan
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
> >>> [hidden email]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the feedback Bowen!
> >>>>>>>
> >>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>>>>>>
> >>>>>>> Best, Jincheng
> >>>>>>>
> >>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
> during
> >>>>>>>> creating the FLIP @Jincheng.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hi Bowen,
> >>>>>>>>
> >>>>>>>> Very appreciated for your comments. I have replied you in the
> design
> >>>>> doc.
> >>>>>>>> As it seems that the comments doesn't affect the overall design,
> I'll
> >>>>> not
> >>>>>>>> cancel the vote for now and we can continue the discussion in the
> >>>>> design
> >>>>>>>> doc.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>> <
> >>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>> Regards,
> >>>>>>>> Dian
> >>>>>>>>
> >>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
> >>>>>>>>>
> >>>>>>>>> Hi Jincheng and Dian,
> >>>>>>>>>
> >>>>>>>>> Sorry for being late to the party. I took a glance at the
> proposal,
> >>>>>>> LGTM
> >>>>>>>> in
> >>>>>>>>> general, and I left only a couple comments.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Bowen
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]>
> >>>>> wrote:
> >>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>
> >>>>>>>>>> Thanks! It works.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Dian
> >>>>>>>>>>
> >>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]>
> 写道:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Appreciated for the kind tips and offering of help. Definitely
> >>> need
> >>>>>>>> it!
> >>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
> Dian
> >>> Fu
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Dian
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]>
> 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Dian, I am glad to see that you want help to create the FLIP!
> >>>>>>>>>>>>> Everyone will have first time, and I am very willing to help
> you
> >>>>>>>>>> complete
> >>>>>>>>>>>>> your first FLIP creation. Here some tips:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - First I'll give your account write permission for
> confluence.
> >>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
> Template
> >>>>>>> [1],
> >>>>>>>>>>>> (It's
> >>>>>>>>>>>>> better to know more about FLIP by reading [2])
> >>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing the
> >>> VOTE
> >>>>>>> of
> >>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
> >>> want!
> >>>>> )
> >>>>>>>>>>>>> Any problems you encounter during this period,feel free to
> tell
> >>> me
> >>>>>>>> that
> >>>>>>>>>>>> we
> >>>>>>>>>>>>> can solve them together. :)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五 上午11:54写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> +1 for starting the vote.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best, Hequn
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
> >>> [hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
> >>>>> willing
> >>>>>>>> to
> >>>>>>>>>>>> help
> >>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't created
> a
> >>>>> FLIP
> >>>>>>>>>>>> before,
> >>>>>>>>>>>>>>> it will be great if you could help on this. :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
> [hidden email]>
> >>>>>>> 写道:
> >>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
> >>>>> suggestions
> >>>>>>>> and
> >>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
> create a
> >>>>>>> FLIP
> >>>>>>>>>> for
> >>>>>>>>>>>>>>>> Apache Flink Python UDFs.
> >>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best, Jincheng
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
> >>>>>>> 上午12:54写道:
> >>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
> reminder
> >>>>>>>> about
> >>>>>>>>>>>>>>> bundle
> >>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I have had add the description about how to perform
> bundle
> >>>>>>>>>> processing
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free to
> >>>>> leave
> >>>>>>>>>>>>>>> comments if
> >>>>>>>>>>>>>>>>> there are anything not describe clearly.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三 上午10:08写道:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
> >>>>>>>> "Checkpoint"[1]
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
> checkpoint.
> >>>>>>>>>>>>>>>>>> However, I think you are right that we should talk more
> >>> about
> >>>>>>>> it,
> >>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the checkpoint
> and
> >>>>>>>>>>>>>> watermark,
> >>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]> 写道:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is very
> >>>>>>>> detailed,
> >>>>>>>>>>>>>>>>>> thorough
> >>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
> >>>>>>>> understand
> >>>>>>>>>> :)
> >>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
> >>> bundle
> >>>>>>>>>>>>>>>>>> processing. It
> >>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
> >>>>>>> elements
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
> >>> Flink
> >>>>>>>>>> runner
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> 1s or
> >>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
> streaming,
> >>> you
> >>>>>>>> can
> >>>>>>>>>>>>>> find
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
> >>>>>>> watermarks
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> checkpointing here:
> >>>>>>>>>>>>>>>>>>>
> >>>
> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
> >>>>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
> >>>>>>>>>>>>>>> [hidden email]>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
> >>>>> already
> >>>>>>>>>> been
> >>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
> >>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users, we'd
> >>> like
> >>>>>>> to
> >>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the Python
> >>> Table
> >>>>>>>> API.
> >>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed offline
> >>> and
> >>>>>>>> have
> >>>>>>>>>>>>>>>>>> drafted a
> >>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
> >>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
> >>>>>>>> thread[2],
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam in
> >>>>>>> latest
> >>>>>>>>>>>>>>>>>> releases. It
> >>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
> structures
> >>> and
> >>>>>>>>>>>>>> protocols
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution. This
> >>>>>>> design
> >>>>>>>> is
> >>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how to
> >>> make
> >>>>>>>> use
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> Beam's
> >>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
> >>> execution:
> >>>>>>>> data
> >>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
> logging,
> >>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
> portability
> >>>>>>>>>> framework
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all the
> >>>>>>>>>>>> contributors
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
> >>>>>>>> framework,
> >>>>>>>>>> we
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
> ease of
> >>>>>>>>>>>>>>>>>> understanding of
> >>>>>>>>>>>>>>>>>>>> the design.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Welcome any feedback.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
> >>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
> >>>>>>>>>>>>>>>>>>>>
> >>>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

jincheng sun
In reply to this post by Aljoscha Krettek-2
Hi Aljoscha,

That's a good points, so far, most of the code will live in flink-python
module, and the rules and relNodes will be put into the both blink and
flink planner modules, some of the common interface of required by planners
will be placed in flink-table-common. I think you are right, we should try
to ensure the changes of this feature is minimal.  For more detail we would
follow this principle when review the PRs.

Great thanks for your questions and remind!

Best,
Jincheng


Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:

> Hi,
>
> Things looks interesting so far!
>
> I had one question: Where will most of the support code for this live?
> Will this add the required code to flink-table-common or the different
> runners? Can we implement this in such a way that only a minimal amount of
> support code is required in the parts of the Table API (and Table API
> runners) that  are not python specific?
>
> Best,
> Aljoscha
>
> > On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
> >
> > Hi Jincheng,
> >
> > 2. Serializability of functions: "#2 is very convenient for users" means
> only until they have the first backwards-compatibility issue, after that
> they will find it not so convinient anymore and will ask why the framework
> allowed storing such objects in a persistent storage. I don't want to be
> picky about it, but wanted to raise awareness that sometimes it is ok to
> limit use cases to guide users for devloping backwards-compatible programs.
> >
> > Thanks for the explanation fo the remaining items. It sounds reasonable
> to me. Regarding the example with `getKind()`, I actually meant
> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
> users to override this property. And I think we should do something similar
> for the getLanguage property.
> >
> > Thanks,
> > Timo
> >
> > On 03.09.19 15:01, jincheng sun wrote:
> >> Hi Timo,
> >>
> >> Thanks for the quick reply ! :)
> >> I have added more example for #3 and #5 to the FLIP. That are great
> >> suggestions !
> >>
> >> Regarding 2:
> >>
> >> There are two kind Serialization for CloudPickle(Which is different from
> >> Java):
> >>  1) For class and function which can be imported, CloudPickle only
> >> serialize the full path of the class and function (just like java class
> >> name).
> >>  2) For the class and function which can not be imported, CloudPickle
> will
> >> serialize the full content of the class and function.
> >> For #2, It means that we can not just store the full path of the class
> and
> >> function.
> >>
> >> The above serialization is recursive.
> >>
> >> However, there is indeed an problem of backwards compatibility when the
> >> module path of the parent class changed. But I think this is an rare
> case
> >> and acceptable. i.e., For Flink framework we never change the user
> >> interface module path if we want to keep backwards compatibility. For
> user
> >> code, if they change the interface of UDF's parent, they should
> re-register
> >> their functions.
> >>
> >> If we do not want support #2, we can store the full path of class and
> >> function, in that case we have no backwards compatibility problem. But I
> >> think the #2 is very convenient for users.
> >>
> >> What do you think?
> >>
> >> Regarding 4:
> >> As I mentioned earlier, there may be built-in Python functions and I
> think
> >> language is a "function" concept. Function and Language are orthogonal
> >> concepts.
> >> We may have R, GO and other language functions in the future, not only
> >> user-defined, but also built-in functions.
> >>
> >> You are right that users will not set this method and for Python
> functions,
> >> it will be set in the code-generated Java function by the framework.
> So, I
> >> think we should declare the getLanguage() in FunctionDefinition for now.
> >> (I'm not pretty sure what do you mean by saying that getKind() is final
> in
> >> UserDefinedFunction?)
> >>
> >> Best,
> >> Jincheng
> >>
> >> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> thanks for your response.
> >>>
> >>> 2. Serializability of functions: Using some arbitrary serialization
> >>> format for shipping a function to worker sounds fine to me. But once we
> >>> store functions a the catalog we need to think about backwards
> >>> compatibility and evolution of interfaces etc. I'm not sure if
> >>> CloudPickle is the right long-term storage format for this. If we don't
> >>> think about this in advance, we are basically violating our code
> quality
> >>> guide [1] of never use Java Serialization but in the Python-way. We are
> >>> using the RPC serialization for persistence.
> >>>
> >>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
> >>> code like the following is not covered there:
> >>>
> >>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>> DataTypes.BIGINT(),
> >>>                                              DataTypes.BIGINT()))
> >>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> >>> DataTypes.BIGINT(),
> >>> DataTypes.BIGINT()))
> >>> self.t_env.register_function("add", add)
> >>>
> >>> 4. FunctionDefinition: Your response still doesn't answer my question
> >>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
> >>> "user-defined function" concept and not a "function" concept. In any
> >>> case, all users should not be able to set this method. So it must be
> >>> final in UserDefinedFunction similar to getKind().
> >>>
> >>> 5. Function characteristics: If UserDefinedFunction is defined in
> >>> Python, why is it not used in your example in FLIP-58. You could you
> >>> extend the example to show how to specify these attributes in the FLIP?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> [1]
> https://flink.apache.org/contributing/code-style-and-quality-java.html
> >>>
> >>> On 02.09.19 15:35, jincheng sun wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Great thanks for your feedback. I would like to share my thoughts with
> >>> you
> >>>> inline. :)
> >>>>
> >>>> Best,
> >>>> Jincheng
> >>>>
> >>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> the FLIP looks awesome. However, I would like to discuss the changes
> to
> >>>>> the user-facing parts again. Some feedback:
> >>>>>
> >>>>> 1. DataViews: With the current non-annotation design for DataViews,
> we
> >>>>> cannot perform eager state declaration, right? At which point during
> >>>>> execution do we know which state is required by the function? We
> need to
> >>>>> instantiate the function first, right?
> >>>>>
> >>>>>> We will analysis the Python AggregateFunction and extract the
> DataViews
> >>>> used in the Python AggregateFunction. This can be done
> >>>> by instantiate a Python AggregateFunction, creating an accumulator by
> >>>> calling method create_accumulator and then analysis the created
> >>>> accumulator. This is actually similar to the way that Java
> >>>> AggregateFunction processing codegen logic. The extracted DataViews
> can
> >>>> then be used to construct the StateDescriptors in the operator, i.e.,
> we
> >>>> should have hold the state spec and the state descriptor id in Java
> >>>> operator and Python worker can access the state by specifying the
> >>>> corresponding state descriptor id.
> >>>>
> >>>>
> >>>>
> >>>>> 2. Serializability of functions: How do we ensure serializability of
> >>>>> functions for catalog persistence? In the Scala/Java API, we would
> like
> >>>>> to register classes instead of instances soon. This is the only way
> to
> >>>>> store a function properly in a catalog or we need some
> >>>>> serialization/deserialization logic in the function interfaces to
> >>>>> convert an instance to string properties.
> >>>>>
> >>>>>> The Python function will be serialized with CloudPickle anyway in
> the
> >>>> Python API as we need to transfer it to the Python worker which can
> then
> >>>> deserialize it for execution. The serialized Python function can be
> >>> stored
> >>>> into catalog.
> >>>>
> >>>>
> >>>>
> >>>>> 3. TableEnvironment: What is the signature of
> `register_function(self,
> >>>>> name, function)`? Does it accept both a class and function? Like
> `class
> >>>>> Sum` and `def split()`? Could you add some examples for registering
> both
> >>>>> kinds of functions?
> >>>>>
> >>>>>> It has been already supported which you mentioned. You can find an
> >>>> example in the POC code:
> >>>>
> >>>
> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
> >>>>
> >>>>
> >>>>> 4. FunctionDefinition: Function definition is not a user-defined
> >>>>> function definition. It is the highest interface for both
> user-defined
> >>>>> and built-in functions. I'm not sure if getLanguage() should be part
> of
> >>>>> this interface or one-level down which would be
> `UserDefinedFunction`.
> >>>>> Built-in functions will never be implemented in a different
> language. In
> >>>>> any case, I would vote for removing the UNKNOWN language, because it
> >>>>> does not solve anything. Why should a user declare a function that
> the
> >>>>> runtime can not handle? I also find the term `JAVA` confusing for
> Scala
> >>>>> users. How about `FunctionLanguage.JVM` instead?
> >>>>>
> >>>>>> Actually we may have built-in Python functions in the future.
> Regarding
> >>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
> >>>> built-in Python
> >>>> funciton for '+' operator, then we don't need to mix using Java and
> >>> Python
> >>>> UDFs. In this way, we can improve the execution performance.
> >>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
> >>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to
> me.
> >>>>
> >>>>
> >>>>
> >>>>> 5. Function characteristics: In the current design, function classes
> do
> >>>>> not extend from any upper class. How can users declare
> characteristics
> >>>>> that are present in `FunctionDefinition` like determinism,
> requirements,
> >>>>> or soon also monotonism.
> >>>>>
> >>>>>> Actually we have defined 'UserDefinedFunction' which is the base
> class
> >>>> for all user-defined functions.
> >>>> We can define the deterministic, requirements, etc in this class.
> >>>> Currently, we have already supported to define the deterministic.
> >>>>
> >>>>
> >>>>
> >>>>> Thanks,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
> >>>>>> Hi Jincheng, Fudian, and Aljoscha,
> >>>>>> I am assuming the proposed python UDX can also be applied to Flink
> SQL.
> >>>>>> Is this correct? If yes, I would suggest to title the FLIP as "Flink
> >>>>> Python
> >>>>>> User-Defined Function" or "Flink Python User-Defined Function for
> >>> Table".
> >>>>>> Regards,
> >>>>>> Shaoxuan
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
> >>> [hidden email]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the feedback Bowen!
> >>>>>>>
> >>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>>>>>>
> >>>>>>> Best, Jincheng
> >>>>>>>
> >>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
> during
> >>>>>>>> creating the FLIP @Jincheng.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hi Bowen,
> >>>>>>>>
> >>>>>>>> Very appreciated for your comments. I have replied you in the
> design
> >>>>> doc.
> >>>>>>>> As it seems that the comments doesn't affect the overall design,
> I'll
> >>>>> not
> >>>>>>>> cancel the vote for now and we can continue the discussion in the
> >>>>> design
> >>>>>>>> doc.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>> <
> >>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>> Regards,
> >>>>>>>> Dian
> >>>>>>>>
> >>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
> >>>>>>>>>
> >>>>>>>>> Hi Jincheng and Dian,
> >>>>>>>>>
> >>>>>>>>> Sorry for being late to the party. I took a glance at the
> proposal,
> >>>>>>> LGTM
> >>>>>>>> in
> >>>>>>>>> general, and I left only a couple comments.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Bowen
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]>
> >>>>> wrote:
> >>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>
> >>>>>>>>>> Thanks! It works.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Dian
> >>>>>>>>>>
> >>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]>
> 写道:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Appreciated for the kind tips and offering of help. Definitely
> >>> need
> >>>>>>>> it!
> >>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
> Dian
> >>> Fu
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Dian
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]>
> 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Dian, I am glad to see that you want help to create the FLIP!
> >>>>>>>>>>>>> Everyone will have first time, and I am very willing to help
> you
> >>>>>>>>>> complete
> >>>>>>>>>>>>> your first FLIP creation. Here some tips:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - First I'll give your account write permission for
> confluence.
> >>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
> Template
> >>>>>>> [1],
> >>>>>>>>>>>> (It's
> >>>>>>>>>>>>> better to know more about FLIP by reading [2])
> >>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing the
> >>> VOTE
> >>>>>>> of
> >>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
> >>> want!
> >>>>> )
> >>>>>>>>>>>>> Any problems you encounter during this period,feel free to
> tell
> >>> me
> >>>>>>>> that
> >>>>>>>>>>>> we
> >>>>>>>>>>>>> can solve them together. :)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五 上午11:54写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> +1 for starting the vote.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best, Hequn
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
> >>> [hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
> >>>>> willing
> >>>>>>>> to
> >>>>>>>>>>>> help
> >>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't created
> a
> >>>>> FLIP
> >>>>>>>>>>>> before,
> >>>>>>>>>>>>>>> it will be great if you could help on this. :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
> [hidden email]>
> >>>>>>> 写道:
> >>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
> >>>>> suggestions
> >>>>>>>> and
> >>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
> create a
> >>>>>>> FLIP
> >>>>>>>>>> for
> >>>>>>>>>>>>>>>> Apache Flink Python UDFs.
> >>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best, Jincheng
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
> >>>>>>> 上午12:54写道:
> >>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
> reminder
> >>>>>>>> about
> >>>>>>>>>>>>>>> bundle
> >>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I have had add the description about how to perform
> bundle
> >>>>>>>>>> processing
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free to
> >>>>> leave
> >>>>>>>>>>>>>>> comments if
> >>>>>>>>>>>>>>>>> there are anything not describe clearly.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三 上午10:08写道:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
> >>>>>>>> "Checkpoint"[1]
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
> checkpoint.
> >>>>>>>>>>>>>>>>>> However, I think you are right that we should talk more
> >>> about
> >>>>>>>> it,
> >>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the checkpoint
> and
> >>>>>>>>>>>>>> watermark,
> >>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]> 写道:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is very
> >>>>>>>> detailed,
> >>>>>>>>>>>>>>>>>> thorough
> >>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
> >>>>>>>> understand
> >>>>>>>>>> :)
> >>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
> >>> bundle
> >>>>>>>>>>>>>>>>>> processing. It
> >>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
> >>>>>>> elements
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
> >>> Flink
> >>>>>>>>>> runner
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> 1s or
> >>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
> streaming,
> >>> you
> >>>>>>>> can
> >>>>>>>>>>>>>> find
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
> >>>>>>> watermarks
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> checkpointing here:
> >>>>>>>>>>>>>>>>>>>
> >>>
> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
> >>>>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
> >>>>>>>>>>>>>>> [hidden email]>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
> >>>>> already
> >>>>>>>>>> been
> >>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
> >>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users, we'd
> >>> like
> >>>>>>> to
> >>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the Python
> >>> Table
> >>>>>>>> API.
> >>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed offline
> >>> and
> >>>>>>>> have
> >>>>>>>>>>>>>>>>>> drafted a
> >>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
> >>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
> >>>>>>>> thread[2],
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam in
> >>>>>>> latest
> >>>>>>>>>>>>>>>>>> releases. It
> >>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
> structures
> >>> and
> >>>>>>>>>>>>>> protocols
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution. This
> >>>>>>> design
> >>>>>>>> is
> >>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how to
> >>> make
> >>>>>>>> use
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> Beam's
> >>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
> >>> execution:
> >>>>>>>> data
> >>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
> logging,
> >>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
> portability
> >>>>>>>>>> framework
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all the
> >>>>>>>>>>>> contributors
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
> >>>>>>>> framework,
> >>>>>>>>>> we
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
> ease of
> >>>>>>>>>>>>>>>>>> understanding of
> >>>>>>>>>>>>>>>>>>>> the design.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Welcome any feedback.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
> >>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
> >>>>>>>>>>>>>>>>>>>>
> >>>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Aljoscha Krettek-2
Hi,

Another thing to consider is the Scope of the FLIP. Currently, we try to support (stateful) AggregateFunctions. I have some concerns about whether or not DataView/MapView/ListView is a good interface because it requires quite some magic from the runners to make it work, such as messing with the TypeInformation and injecting objects at runtime. If the FLIP aims for the minimum of ScalarFunctions and the whole execution harness, that should be easier to agree on.

Another point is the naming of the new methods. I think Timo hinted at the fact that we have to consider catalog support for functions. There is ongoing work about differentiating between temporary objects and objects that are stored in a catalog (FLIP-64 [1]). With this in mind, the method for registering functions should be called register_temporary_function() and so on. Unless we want to already think about mixing Python and Java functions in the catalog, which is outside the scope of this FLIP, I think.

Best,
Aljoscha

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module


> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> That's a good points, so far, most of the code will live in flink-python
> module, and the rules and relNodes will be put into the both blink and
> flink planner modules, some of the common interface of required by planners
> will be placed in flink-table-common. I think you are right, we should try
> to ensure the changes of this feature is minimal.  For more detail we would
> follow this principle when review the PRs.
>
> Great thanks for your questions and remind!
>
> Best,
> Jincheng
>
>
> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
>
>> Hi,
>>
>> Things looks interesting so far!
>>
>> I had one question: Where will most of the support code for this live?
>> Will this add the required code to flink-table-common or the different
>> runners? Can we implement this in such a way that only a minimal amount of
>> support code is required in the parts of the Table API (and Table API
>> runners) that  are not python specific?
>>
>> Best,
>> Aljoscha
>>
>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>>>
>>> Hi Jincheng,
>>>
>>> 2. Serializability of functions: "#2 is very convenient for users" means
>> only until they have the first backwards-compatibility issue, after that
>> they will find it not so convinient anymore and will ask why the framework
>> allowed storing such objects in a persistent storage. I don't want to be
>> picky about it, but wanted to raise awareness that sometimes it is ok to
>> limit use cases to guide users for devloping backwards-compatible programs.
>>>
>>> Thanks for the explanation fo the remaining items. It sounds reasonable
>> to me. Regarding the example with `getKind()`, I actually meant
>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
>> users to override this property. And I think we should do something similar
>> for the getLanguage property.
>>>
>>> Thanks,
>>> Timo
>>>
>>> On 03.09.19 15:01, jincheng sun wrote:
>>>> Hi Timo,
>>>>
>>>> Thanks for the quick reply ! :)
>>>> I have added more example for #3 and #5 to the FLIP. That are great
>>>> suggestions !
>>>>
>>>> Regarding 2:
>>>>
>>>> There are two kind Serialization for CloudPickle(Which is different from
>>>> Java):
>>>> 1) For class and function which can be imported, CloudPickle only
>>>> serialize the full path of the class and function (just like java class
>>>> name).
>>>> 2) For the class and function which can not be imported, CloudPickle
>> will
>>>> serialize the full content of the class and function.
>>>> For #2, It means that we can not just store the full path of the class
>> and
>>>> function.
>>>>
>>>> The above serialization is recursive.
>>>>
>>>> However, there is indeed an problem of backwards compatibility when the
>>>> module path of the parent class changed. But I think this is an rare
>> case
>>>> and acceptable. i.e., For Flink framework we never change the user
>>>> interface module path if we want to keep backwards compatibility. For
>> user
>>>> code, if they change the interface of UDF's parent, they should
>> re-register
>>>> their functions.
>>>>
>>>> If we do not want support #2, we can store the full path of class and
>>>> function, in that case we have no backwards compatibility problem. But I
>>>> think the #2 is very convenient for users.
>>>>
>>>> What do you think?
>>>>
>>>> Regarding 4:
>>>> As I mentioned earlier, there may be built-in Python functions and I
>> think
>>>> language is a "function" concept. Function and Language are orthogonal
>>>> concepts.
>>>> We may have R, GO and other language functions in the future, not only
>>>> user-defined, but also built-in functions.
>>>>
>>>> You are right that users will not set this method and for Python
>> functions,
>>>> it will be set in the code-generated Java function by the framework.
>> So, I
>>>> think we should declare the getLanguage() in FunctionDefinition for now.
>>>> (I'm not pretty sure what do you mean by saying that getKind() is final
>> in
>>>> UserDefinedFunction?)
>>>>
>>>> Best,
>>>> Jincheng
>>>>
>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>>>
>>>>> Hi Jincheng,
>>>>>
>>>>> thanks for your response.
>>>>>
>>>>> 2. Serializability of functions: Using some arbitrary serialization
>>>>> format for shipping a function to worker sounds fine to me. But once we
>>>>> store functions a the catalog we need to think about backwards
>>>>> compatibility and evolution of interfaces etc. I'm not sure if
>>>>> CloudPickle is the right long-term storage format for this. If we don't
>>>>> think about this in advance, we are basically violating our code
>> quality
>>>>> guide [1] of never use Java Serialization but in the Python-way. We are
>>>>> using the RPC serialization for persistence.
>>>>>
>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because API
>>>>> code like the following is not covered there:
>>>>>
>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>>>> DataTypes.BIGINT(),
>>>>>                                             DataTypes.BIGINT()))
>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>>>> DataTypes.BIGINT(),
>>>>> DataTypes.BIGINT()))
>>>>> self.t_env.register_function("add", add)
>>>>>
>>>>> 4. FunctionDefinition: Your response still doesn't answer my question
>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
>>>>> "user-defined function" concept and not a "function" concept. In any
>>>>> case, all users should not be able to set this method. So it must be
>>>>> final in UserDefinedFunction similar to getKind().
>>>>>
>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>>>> Python, why is it not used in your example in FLIP-58. You could you
>>>>> extend the example to show how to specify these attributes in the FLIP?
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1]
>> https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>>>
>>>>> On 02.09.19 15:35, jincheng sun wrote:
>>>>>> Hi Timo,
>>>>>>
>>>>>> Great thanks for your feedback. I would like to share my thoughts with
>>>>> you
>>>>>> inline. :)
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> the FLIP looks awesome. However, I would like to discuss the changes
>> to
>>>>>>> the user-facing parts again. Some feedback:
>>>>>>>
>>>>>>> 1. DataViews: With the current non-annotation design for DataViews,
>> we
>>>>>>> cannot perform eager state declaration, right? At which point during
>>>>>>> execution do we know which state is required by the function? We
>> need to
>>>>>>> instantiate the function first, right?
>>>>>>>
>>>>>>>> We will analysis the Python AggregateFunction and extract the
>> DataViews
>>>>>> used in the Python AggregateFunction. This can be done
>>>>>> by instantiate a Python AggregateFunction, creating an accumulator by
>>>>>> calling method create_accumulator and then analysis the created
>>>>>> accumulator. This is actually similar to the way that Java
>>>>>> AggregateFunction processing codegen logic. The extracted DataViews
>> can
>>>>>> then be used to construct the StateDescriptors in the operator, i.e.,
>> we
>>>>>> should have hold the state spec and the state descriptor id in Java
>>>>>> operator and Python worker can access the state by specifying the
>>>>>> corresponding state descriptor id.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> 2. Serializability of functions: How do we ensure serializability of
>>>>>>> functions for catalog persistence? In the Scala/Java API, we would
>> like
>>>>>>> to register classes instead of instances soon. This is the only way
>> to
>>>>>>> store a function properly in a catalog or we need some
>>>>>>> serialization/deserialization logic in the function interfaces to
>>>>>>> convert an instance to string properties.
>>>>>>>
>>>>>>>> The Python function will be serialized with CloudPickle anyway in
>> the
>>>>>> Python API as we need to transfer it to the Python worker which can
>> then
>>>>>> deserialize it for execution. The serialized Python function can be
>>>>> stored
>>>>>> into catalog.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> 3. TableEnvironment: What is the signature of
>> `register_function(self,
>>>>>>> name, function)`? Does it accept both a class and function? Like
>> `class
>>>>>>> Sum` and `def split()`? Could you add some examples for registering
>> both
>>>>>>> kinds of functions?
>>>>>>>
>>>>>>>> It has been already supported which you mentioned. You can find an
>>>>>> example in the POC code:
>>>>>>
>>>>>
>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>>>
>>>>>>
>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>>>> function definition. It is the highest interface for both
>> user-defined
>>>>>>> and built-in functions. I'm not sure if getLanguage() should be part
>> of
>>>>>>> this interface or one-level down which would be
>> `UserDefinedFunction`.
>>>>>>> Built-in functions will never be implemented in a different
>> language. In
>>>>>>> any case, I would vote for removing the UNKNOWN language, because it
>>>>>>> does not solve anything. Why should a user declare a function that
>> the
>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
>> Scala
>>>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>>>
>>>>>>>> Actually we may have built-in Python functions in the future.
>> Regarding
>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
>>>>>> built-in Python
>>>>>> funciton for '+' operator, then we don't need to mix using Java and
>>>>> Python
>>>>>> UDFs. In this way, we can improve the execution performance.
>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to
>> me.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> 5. Function characteristics: In the current design, function classes
>> do
>>>>>>> not extend from any upper class. How can users declare
>> characteristics
>>>>>>> that are present in `FunctionDefinition` like determinism,
>> requirements,
>>>>>>> or soon also monotonism.
>>>>>>>
>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
>> class
>>>>>> for all user-defined functions.
>>>>>> We can define the deterministic, requirements, etc in this class.
>>>>>> Currently, we have already supported to define the deterministic.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Thanks,
>>>>>>> Timo
>>>>>>>
>>>>>>>
>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>>>> I am assuming the proposed python UDX can also be applied to Flink
>> SQL.
>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as "Flink
>>>>>>> Python
>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function for
>>>>> Table".
>>>>>>>> Regards,
>>>>>>>> Shaoxuan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the feedback Bowen!
>>>>>>>>>
>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>>>
>>>>>>>>> Best, Jincheng
>>>>>>>>>
>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
>> during
>>>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi Bowen,
>>>>>>>>>>
>>>>>>>>>> Very appreciated for your comments. I have replied you in the
>> design
>>>>>>> doc.
>>>>>>>>>> As it seems that the comments doesn't affect the overall design,
>> I'll
>>>>>>> not
>>>>>>>>>> cancel the vote for now and we can continue the discussion in the
>>>>>>> design
>>>>>>>>>> doc.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>> <
>>>>>>>>>>
>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>> Regards,
>>>>>>>>>> Dian
>>>>>>>>>>
>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>>>
>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
>> proposal,
>>>>>>>>> LGTM
>>>>>>>>>> in
>>>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Bowen
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks! It works.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]>
>> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help. Definitely
>>>>> need
>>>>>>>>>> it!
>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
>> Dian
>>>>> Fu
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]>
>> 写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the FLIP!
>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to help
>> you
>>>>>>>>>>>> complete
>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - First I'll give your account write permission for
>> confluence.
>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
>> Template
>>>>>>>>> [1],
>>>>>>>>>>>>>> (It's
>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing the
>>>>> VOTE
>>>>>>>>> of
>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
>>>>> want!
>>>>>>> )
>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free to
>> tell
>>>>> me
>>>>>>>>>> that
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五 上午11:54写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
>>>>>>> willing
>>>>>>>>>> to
>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't created
>> a
>>>>>>> FLIP
>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
>> [hidden email]>
>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>>>> suggestions
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
>> create a
>>>>>>>>> FLIP
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
>> reminder
>>>>>>>>>> about
>>>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
>> bundle
>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free to
>>>>>>> leave
>>>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三 上午10:08写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
>> checkpoint.
>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk more
>>>>> about
>>>>>>>>>> it,
>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the checkpoint
>> and
>>>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>
>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>
>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]> 写道:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is very
>>>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
>>>>>>>>>> understand
>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
>>>>> bundle
>>>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
>>>>>>>>> elements
>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
>>>>> Flink
>>>>>>>>>>>> runner
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
>> streaming,
>>>>> you
>>>>>>>>>> can
>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>>>> watermarks
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>>>
>>>>>
>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
>>>>>>> already
>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users, we'd
>>>>> like
>>>>>>>>> to
>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the Python
>>>>> Table
>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed offline
>>>>> and
>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
>>>>>>>>>> thread[2],
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam in
>>>>>>>>> latest
>>>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
>> structures
>>>>> and
>>>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution. This
>>>>>>>>> design
>>>>>>>>>> is
>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how to
>>>>> make
>>>>>>>>>> use
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>>>> execution:
>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
>> logging,
>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
>> portability
>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all the
>>>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
>>>>>>>>>> framework,
>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
>> ease of
>>>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>>>
>>>>>
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

jincheng sun
Hi Aljoscha,

Thanks for your comments!

Regarding to the FLIP scope, it seems that we have agreed on the design of
the stateless function support.
What do you think about starting the development of the stateless function
support firstly and continue the discussion of stateful function support?
Or you think we should split the current FLIP into two FLIPs and discuss
the stateful function support in another thread?

Currently, the Python DataView/MapView/ListView interfaces design follow
the Java/Scala naming conversions.
Of couse, We can continue to discuss whether there are better solutions,
i.e. using annotations.

Regarding to the magic logic to support DataView/MapView/ListView, it will
be done by the framework and is transparent for users.
Per my understanding, the magic logic is unavoidable no matter what the
interfaces will be.

Regarding to the catalog support of python function:1) If it's stored in
memory as temporary object, just as you said, users can call
TableEnvironment.register_function(will change to
register_temporary_function in FLIP-64)
2) If it's persisted in external storage, users can call
Catalog.create_function. There will be no API change per my understanding.

What do you think?
Best,Jincheng

Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:

> Hi,
>
> Another thing to consider is the Scope of the FLIP. Currently, we try to
> support (stateful) AggregateFunctions. I have some concerns about whether
> or not DataView/MapView/ListView is a good interface because it requires
> quite some magic from the runners to make it work, such as messing with the
> TypeInformation and injecting objects at runtime. If the FLIP aims for the
> minimum of ScalarFunctions and the whole execution harness, that should be
> easier to agree on.
>
> Another point is the naming of the new methods. I think Timo hinted at the
> fact that we have to consider catalog support for functions. There is
> ongoing work about differentiating between temporary objects and objects
> that are stored in a catalog (FLIP-64 [1]). With this in mind, the method
> for registering functions should be called register_temporary_function()
> and so on. Unless we want to already think about mixing Python and Java
> functions in the catalog, which is outside the scope of this FLIP, I think.
>
> Best,
> Aljoscha
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>
>
> > On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]> wrote:
> >
> > Hi Aljoscha,
> >
> > That's a good points, so far, most of the code will live in flink-python
> > module, and the rules and relNodes will be put into the both blink and
> > flink planner modules, some of the common interface of required by
> planners
> > will be placed in flink-table-common. I think you are right, we should
> try
> > to ensure the changes of this feature is minimal.  For more detail we
> would
> > follow this principle when review the PRs.
> >
> > Great thanks for your questions and remind!
> >
> > Best,
> > Jincheng
> >
> >
> > Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
> >
> >> Hi,
> >>
> >> Things looks interesting so far!
> >>
> >> I had one question: Where will most of the support code for this live?
> >> Will this add the required code to flink-table-common or the different
> >> runners? Can we implement this in such a way that only a minimal amount
> of
> >> support code is required in the parts of the Table API (and Table API
> >> runners) that  are not python specific?
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
> >>>
> >>> Hi Jincheng,
> >>>
> >>> 2. Serializability of functions: "#2 is very convenient for users"
> means
> >> only until they have the first backwards-compatibility issue, after that
> >> they will find it not so convinient anymore and will ask why the
> framework
> >> allowed storing such objects in a persistent storage. I don't want to be
> >> picky about it, but wanted to raise awareness that sometimes it is ok to
> >> limit use cases to guide users for devloping backwards-compatible
> programs.
> >>>
> >>> Thanks for the explanation fo the remaining items. It sounds reasonable
> >> to me. Regarding the example with `getKind()`, I actually meant
> >> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
> >> users to override this property. And I think we should do something
> similar
> >> for the getLanguage property.
> >>>
> >>> Thanks,
> >>> Timo
> >>>
> >>> On 03.09.19 15:01, jincheng sun wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Thanks for the quick reply ! :)
> >>>> I have added more example for #3 and #5 to the FLIP. That are great
> >>>> suggestions !
> >>>>
> >>>> Regarding 2:
> >>>>
> >>>> There are two kind Serialization for CloudPickle(Which is different
> from
> >>>> Java):
> >>>> 1) For class and function which can be imported, CloudPickle only
> >>>> serialize the full path of the class and function (just like java
> class
> >>>> name).
> >>>> 2) For the class and function which can not be imported, CloudPickle
> >> will
> >>>> serialize the full content of the class and function.
> >>>> For #2, It means that we can not just store the full path of the class
> >> and
> >>>> function.
> >>>>
> >>>> The above serialization is recursive.
> >>>>
> >>>> However, there is indeed an problem of backwards compatibility when
> the
> >>>> module path of the parent class changed. But I think this is an rare
> >> case
> >>>> and acceptable. i.e., For Flink framework we never change the user
> >>>> interface module path if we want to keep backwards compatibility. For
> >> user
> >>>> code, if they change the interface of UDF's parent, they should
> >> re-register
> >>>> their functions.
> >>>>
> >>>> If we do not want support #2, we can store the full path of class and
> >>>> function, in that case we have no backwards compatibility problem.
> But I
> >>>> think the #2 is very convenient for users.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regarding 4:
> >>>> As I mentioned earlier, there may be built-in Python functions and I
> >> think
> >>>> language is a "function" concept. Function and Language are orthogonal
> >>>> concepts.
> >>>> We may have R, GO and other language functions in the future, not only
> >>>> user-defined, but also built-in functions.
> >>>>
> >>>> You are right that users will not set this method and for Python
> >> functions,
> >>>> it will be set in the code-generated Java function by the framework.
> >> So, I
> >>>> think we should declare the getLanguage() in FunctionDefinition for
> now.
> >>>> (I'm not pretty sure what do you mean by saying that getKind() is
> final
> >> in
> >>>> UserDefinedFunction?)
> >>>>
> >>>> Best,
> >>>> Jincheng
> >>>>
> >>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
> >>>>
> >>>>> Hi Jincheng,
> >>>>>
> >>>>> thanks for your response.
> >>>>>
> >>>>> 2. Serializability of functions: Using some arbitrary serialization
> >>>>> format for shipping a function to worker sounds fine to me. But once
> we
> >>>>> store functions a the catalog we need to think about backwards
> >>>>> compatibility and evolution of interfaces etc. I'm not sure if
> >>>>> CloudPickle is the right long-term storage format for this. If we
> don't
> >>>>> think about this in advance, we are basically violating our code
> >> quality
> >>>>> guide [1] of never use Java Serialization but in the Python-way. We
> are
> >>>>> using the RPC serialization for persistence.
> >>>>>
> >>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
> API
> >>>>> code like the following is not covered there:
> >>>>>
> >>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>>>> DataTypes.BIGINT(),
> >>>>>                                             DataTypes.BIGINT()))
> >>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> >>>>> DataTypes.BIGINT(),
> >>>>> DataTypes.BIGINT()))
> >>>>> self.t_env.register_function("add", add)
> >>>>>
> >>>>> 4. FunctionDefinition: Your response still doesn't answer my question
> >>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this is
> a
> >>>>> "user-defined function" concept and not a "function" concept. In any
> >>>>> case, all users should not be able to set this method. So it must be
> >>>>> final in UserDefinedFunction similar to getKind().
> >>>>>
> >>>>> 5. Function characteristics: If UserDefinedFunction is defined in
> >>>>> Python, why is it not used in your example in FLIP-58. You could you
> >>>>> extend the example to show how to specify these attributes in the
> FLIP?
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>> [1]
> >> https://flink.apache.org/contributing/code-style-and-quality-java.html
> >>>>>
> >>>>> On 02.09.19 15:35, jincheng sun wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> Great thanks for your feedback. I would like to share my thoughts
> with
> >>>>> you
> >>>>>> inline. :)
> >>>>>>
> >>>>>> Best,
> >>>>>> Jincheng
> >>>>>>
> >>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> the FLIP looks awesome. However, I would like to discuss the
> changes
> >> to
> >>>>>>> the user-facing parts again. Some feedback:
> >>>>>>>
> >>>>>>> 1. DataViews: With the current non-annotation design for DataViews,
> >> we
> >>>>>>> cannot perform eager state declaration, right? At which point
> during
> >>>>>>> execution do we know which state is required by the function? We
> >> need to
> >>>>>>> instantiate the function first, right?
> >>>>>>>
> >>>>>>>> We will analysis the Python AggregateFunction and extract the
> >> DataViews
> >>>>>> used in the Python AggregateFunction. This can be done
> >>>>>> by instantiate a Python AggregateFunction, creating an accumulator
> by
> >>>>>> calling method create_accumulator and then analysis the created
> >>>>>> accumulator. This is actually similar to the way that Java
> >>>>>> AggregateFunction processing codegen logic. The extracted DataViews
> >> can
> >>>>>> then be used to construct the StateDescriptors in the operator,
> i.e.,
> >> we
> >>>>>> should have hold the state spec and the state descriptor id in Java
> >>>>>> operator and Python worker can access the state by specifying the
> >>>>>> corresponding state descriptor id.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> 2. Serializability of functions: How do we ensure serializability
> of
> >>>>>>> functions for catalog persistence? In the Scala/Java API, we would
> >> like
> >>>>>>> to register classes instead of instances soon. This is the only way
> >> to
> >>>>>>> store a function properly in a catalog or we need some
> >>>>>>> serialization/deserialization logic in the function interfaces to
> >>>>>>> convert an instance to string properties.
> >>>>>>>
> >>>>>>>> The Python function will be serialized with CloudPickle anyway in
> >> the
> >>>>>> Python API as we need to transfer it to the Python worker which can
> >> then
> >>>>>> deserialize it for execution. The serialized Python function can be
> >>>>> stored
> >>>>>> into catalog.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> 3. TableEnvironment: What is the signature of
> >> `register_function(self,
> >>>>>>> name, function)`? Does it accept both a class and function? Like
> >> `class
> >>>>>>> Sum` and `def split()`? Could you add some examples for registering
> >> both
> >>>>>>> kinds of functions?
> >>>>>>>
> >>>>>>>> It has been already supported which you mentioned. You can find an
> >>>>>> example in the POC code:
> >>>>>>
> >>>>>
> >>
> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
> >>>>>>
> >>>>>>
> >>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
> >>>>>>> function definition. It is the highest interface for both
> >> user-defined
> >>>>>>> and built-in functions. I'm not sure if getLanguage() should be
> part
> >> of
> >>>>>>> this interface or one-level down which would be
> >> `UserDefinedFunction`.
> >>>>>>> Built-in functions will never be implemented in a different
> >> language. In
> >>>>>>> any case, I would vote for removing the UNKNOWN language, because
> it
> >>>>>>> does not solve anything. Why should a user declare a function that
> >> the
> >>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
> >> Scala
> >>>>>>> users. How about `FunctionLanguage.JVM` instead?
> >>>>>>>
> >>>>>>>> Actually we may have built-in Python functions in the future.
> >> Regarding
> >>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
> >>>>>> built-in Python
> >>>>>> funciton for '+' operator, then we don't need to mix using Java and
> >>>>> Python
> >>>>>> UDFs. In this way, we can improve the execution performance.
> >>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
> >>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
> to
> >> me.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> 5. Function characteristics: In the current design, function
> classes
> >> do
> >>>>>>> not extend from any upper class. How can users declare
> >> characteristics
> >>>>>>> that are present in `FunctionDefinition` like determinism,
> >> requirements,
> >>>>>>> or soon also monotonism.
> >>>>>>>
> >>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
> >> class
> >>>>>> for all user-defined functions.
> >>>>>> We can define the deterministic, requirements, etc in this class.
> >>>>>> Currently, we have already supported to define the deterministic.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> Thanks,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
> >>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
> >>>>>>>> I am assuming the proposed python UDX can also be applied to Flink
> >> SQL.
> >>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
> "Flink
> >>>>>>> Python
> >>>>>>>> User-Defined Function" or "Flink Python User-Defined Function for
> >>>>> Table".
> >>>>>>>> Regards,
> >>>>>>>> Shaoxuan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
> >>>>> [hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the feedback Bowen!
> >>>>>>>>>
> >>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>>>>>>>>
> >>>>>>>>> Best, Jincheng
> >>>>>>>>>
> >>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
> >> during
> >>>>>>>>>> creating the FLIP @Jincheng.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Hi Bowen,
> >>>>>>>>>>
> >>>>>>>>>> Very appreciated for your comments. I have replied you in the
> >> design
> >>>>>>> doc.
> >>>>>>>>>> As it seems that the comments doesn't affect the overall design,
> >> I'll
> >>>>>>> not
> >>>>>>>>>> cancel the vote for now and we can continue the discussion in
> the
> >>>>>>> design
> >>>>>>>>>> doc.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>>>> Regards,
> >>>>>>>>>> Dian
> >>>>>>>>>>
> >>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Jincheng and Dian,
> >>>>>>>>>>>
> >>>>>>>>>>> Sorry for being late to the party. I took a glance at the
> >> proposal,
> >>>>>>>>> LGTM
> >>>>>>>>>> in
> >>>>>>>>>>> general, and I left only a couple comments.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Bowen
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]
> >
> >>>>>>> wrote:
> >>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks! It works.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Dian
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]>
> >> 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
> Definitely
> >>>>> need
> >>>>>>>>>> it!
> >>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
> >> Dian
> >>>>> Fu
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]
> >
> >> 写道:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
> FLIP!
> >>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
> help
> >> you
> >>>>>>>>>>>> complete
> >>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - First I'll give your account write permission for
> >> confluence.
> >>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
> >> Template
> >>>>>>>>> [1],
> >>>>>>>>>>>>>> (It's
> >>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
> >>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
> the
> >>>>> VOTE
> >>>>>>>>> of
> >>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
> >>>>> want!
> >>>>>>> )
> >>>>>>>>>>>>>>> Any problems you encounter during this period,feel free to
> >> tell
> >>>>> me
> >>>>>>>>>> that
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> can solve them together. :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
> 上午11:54写道:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> +1 for starting the vote.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best, Hequn
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
> >>>>> [hidden email]>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
> >>>>>>> willing
> >>>>>>>>>> to
> >>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
> created
> >> a
> >>>>>>> FLIP
> >>>>>>>>>>>>>> before,
> >>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
> >> [hidden email]>
> >>>>>>>>> 写道:
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
> >>>>>>> suggestions
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
> >> create a
> >>>>>>>>> FLIP
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
> >>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best, Jincheng
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
> >>>>>>>>> 上午12:54写道:
> >>>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
> >> reminder
> >>>>>>>>>> about
> >>>>>>>>>>>>>>>>> bundle
> >>>>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
> >> bundle
> >>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free
> to
> >>>>>>> leave
> >>>>>>>>>>>>>>>>> comments if
> >>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
> 上午10:08写道:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
> >>>>>>>>>> "Checkpoint"[1]
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
> >> checkpoint.
> >>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
> more
> >>>>> about
> >>>>>>>>>> it,
> >>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
> checkpoint
> >> and
> >>>>>>>>>>>>>>>> watermark,
> >>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
> 写道:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
> very
> >>>>>>>>>> detailed,
> >>>>>>>>>>>>>>>>>>>> thorough
> >>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
> >>>>>>>>>> understand
> >>>>>>>>>>>> :)
> >>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
> >>>>> bundle
> >>>>>>>>>>>>>>>>>>>> processing. It
> >>>>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
> >>>>>>>>> elements
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
> >>>>> Flink
> >>>>>>>>>>>> runner
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> 1s or
> >>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
> >> streaming,
> >>>>> you
> >>>>>>>>>> can
> >>>>>>>>>>>>>>>> find
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
> >>>>>>>>> watermarks
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> checkpointing here:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>
> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
> >>>>>>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
> >>>>>>>>>>>>>>>>> [hidden email]>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
> >>>>>>> already
> >>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
> >>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
> we'd
> >>>>> like
> >>>>>>>>> to
> >>>>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
> Python
> >>>>> Table
> >>>>>>>>>> API.
> >>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
> offline
> >>>>> and
> >>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> drafted a
> >>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
> >>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
> >>>>>>>>>> thread[2],
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam
> in
> >>>>>>>>> latest
> >>>>>>>>>>>>>>>>>>>> releases. It
> >>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
> >> structures
> >>>>> and
> >>>>>>>>>>>>>>>> protocols
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
> This
> >>>>>>>>> design
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how
> to
> >>>>> make
> >>>>>>>>>> use
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> Beam's
> >>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
> >>>>> execution:
> >>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
> >> logging,
> >>>>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
> >> portability
> >>>>>>>>>>>> framework
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
> the
> >>>>>>>>>>>>>> contributors
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
> >>>>>>>>>> framework,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
> >> ease of
> >>>>>>>>>>>>>>>>>>>> understanding of
> >>>>>>>>>>>>>>>>>>>>>> the design.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
> >>>>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>
> >>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Aljoscha Krettek-2
Hi,

Regarding stateful functions and MapView/DataView/ListView: I think it’s best to keep that for a later FLIP and focus on a more basic version. Supporting stateful functions, especially with MapView can potentially be very slow so we have to see what we can do there.

For the method names, I don’t know. If FLIP-64 passes they have to be changed. So we could use the final names right away, but I’m also fine with using the old method names for now.

Best,
Aljoscha

> On 5. Sep 2019, at 12:40, jincheng sun <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Thanks for your comments!
>
> Regarding to the FLIP scope, it seems that we have agreed on the design of
> the stateless function support.
> What do you think about starting the development of the stateless function
> support firstly and continue the discussion of stateful function support?
> Or you think we should split the current FLIP into two FLIPs and discuss
> the stateful function support in another thread?
>
> Currently, the Python DataView/MapView/ListView interfaces design follow
> the Java/Scala naming conversions.
> Of couse, We can continue to discuss whether there are better solutions,
> i.e. using annotations.
>
> Regarding to the magic logic to support DataView/MapView/ListView, it will
> be done by the framework and is transparent for users.
> Per my understanding, the magic logic is unavoidable no matter what the
> interfaces will be.
>
> Regarding to the catalog support of python function:1) If it's stored in
> memory as temporary object, just as you said, users can call
> TableEnvironment.register_function(will change to
> register_temporary_function in FLIP-64)
> 2) If it's persisted in external storage, users can call
> Catalog.create_function. There will be no API change per my understanding.
>
> What do you think?
> Best,Jincheng
>
> Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:
>
>> Hi,
>>
>> Another thing to consider is the Scope of the FLIP. Currently, we try to
>> support (stateful) AggregateFunctions. I have some concerns about whether
>> or not DataView/MapView/ListView is a good interface because it requires
>> quite some magic from the runners to make it work, such as messing with the
>> TypeInformation and injecting objects at runtime. If the FLIP aims for the
>> minimum of ScalarFunctions and the whole execution harness, that should be
>> easier to agree on.
>>
>> Another point is the naming of the new methods. I think Timo hinted at the
>> fact that we have to consider catalog support for functions. There is
>> ongoing work about differentiating between temporary objects and objects
>> that are stored in a catalog (FLIP-64 [1]). With this in mind, the method
>> for registering functions should be called register_temporary_function()
>> and so on. Unless we want to already think about mixing Python and Java
>> functions in the catalog, which is outside the scope of this FLIP, I think.
>>
>> Best,
>> Aljoscha
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>>
>>
>>> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> That's a good points, so far, most of the code will live in flink-python
>>> module, and the rules and relNodes will be put into the both blink and
>>> flink planner modules, some of the common interface of required by
>> planners
>>> will be placed in flink-table-common. I think you are right, we should
>> try
>>> to ensure the changes of this feature is minimal.  For more detail we
>> would
>>> follow this principle when review the PRs.
>>>
>>> Great thanks for your questions and remind!
>>>
>>> Best,
>>> Jincheng
>>>
>>>
>>> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
>>>
>>>> Hi,
>>>>
>>>> Things looks interesting so far!
>>>>
>>>> I had one question: Where will most of the support code for this live?
>>>> Will this add the required code to flink-table-common or the different
>>>> runners? Can we implement this in such a way that only a minimal amount
>> of
>>>> support code is required in the parts of the Table API (and Table API
>>>> runners) that  are not python specific?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>>>>>
>>>>> Hi Jincheng,
>>>>>
>>>>> 2. Serializability of functions: "#2 is very convenient for users"
>> means
>>>> only until they have the first backwards-compatibility issue, after that
>>>> they will find it not so convinient anymore and will ask why the
>> framework
>>>> allowed storing such objects in a persistent storage. I don't want to be
>>>> picky about it, but wanted to raise awareness that sometimes it is ok to
>>>> limit use cases to guide users for devloping backwards-compatible
>> programs.
>>>>>
>>>>> Thanks for the explanation fo the remaining items. It sounds reasonable
>>>> to me. Regarding the example with `getKind()`, I actually meant
>>>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow
>>>> users to override this property. And I think we should do something
>> similar
>>>> for the getLanguage property.
>>>>>
>>>>> Thanks,
>>>>> Timo
>>>>>
>>>>> On 03.09.19 15:01, jincheng sun wrote:
>>>>>> Hi Timo,
>>>>>>
>>>>>> Thanks for the quick reply ! :)
>>>>>> I have added more example for #3 and #5 to the FLIP. That are great
>>>>>> suggestions !
>>>>>>
>>>>>> Regarding 2:
>>>>>>
>>>>>> There are two kind Serialization for CloudPickle(Which is different
>> from
>>>>>> Java):
>>>>>> 1) For class and function which can be imported, CloudPickle only
>>>>>> serialize the full path of the class and function (just like java
>> class
>>>>>> name).
>>>>>> 2) For the class and function which can not be imported, CloudPickle
>>>> will
>>>>>> serialize the full content of the class and function.
>>>>>> For #2, It means that we can not just store the full path of the class
>>>> and
>>>>>> function.
>>>>>>
>>>>>> The above serialization is recursive.
>>>>>>
>>>>>> However, there is indeed an problem of backwards compatibility when
>> the
>>>>>> module path of the parent class changed. But I think this is an rare
>>>> case
>>>>>> and acceptable. i.e., For Flink framework we never change the user
>>>>>> interface module path if we want to keep backwards compatibility. For
>>>> user
>>>>>> code, if they change the interface of UDF's parent, they should
>>>> re-register
>>>>>> their functions.
>>>>>>
>>>>>> If we do not want support #2, we can store the full path of class and
>>>>>> function, in that case we have no backwards compatibility problem.
>> But I
>>>>>> think the #2 is very convenient for users.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Regarding 4:
>>>>>> As I mentioned earlier, there may be built-in Python functions and I
>>>> think
>>>>>> language is a "function" concept. Function and Language are orthogonal
>>>>>> concepts.
>>>>>> We may have R, GO and other language functions in the future, not only
>>>>>> user-defined, but also built-in functions.
>>>>>>
>>>>>> You are right that users will not set this method and for Python
>>>> functions,
>>>>>> it will be set in the code-generated Java function by the framework.
>>>> So, I
>>>>>> think we should declare the getLanguage() in FunctionDefinition for
>> now.
>>>>>> (I'm not pretty sure what do you mean by saying that getKind() is
>> final
>>>> in
>>>>>> UserDefinedFunction?)
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>>>>>
>>>>>>> Hi Jincheng,
>>>>>>>
>>>>>>> thanks for your response.
>>>>>>>
>>>>>>> 2. Serializability of functions: Using some arbitrary serialization
>>>>>>> format for shipping a function to worker sounds fine to me. But once
>> we
>>>>>>> store functions a the catalog we need to think about backwards
>>>>>>> compatibility and evolution of interfaces etc. I'm not sure if
>>>>>>> CloudPickle is the right long-term storage format for this. If we
>> don't
>>>>>>> think about this in advance, we are basically violating our code
>>>> quality
>>>>>>> guide [1] of never use Java Serialization but in the Python-way. We
>> are
>>>>>>> using the RPC serialization for persistence.
>>>>>>>
>>>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
>> API
>>>>>>> code like the following is not covered there:
>>>>>>>
>>>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>>>>>> DataTypes.BIGINT(),
>>>>>>>                                            DataTypes.BIGINT()))
>>>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>>>>>> DataTypes.BIGINT(),
>>>>>>> DataTypes.BIGINT()))
>>>>>>> self.t_env.register_function("add", add)
>>>>>>>
>>>>>>> 4. FunctionDefinition: Your response still doesn't answer my question
>>>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this is
>> a
>>>>>>> "user-defined function" concept and not a "function" concept. In any
>>>>>>> case, all users should not be able to set this method. So it must be
>>>>>>> final in UserDefinedFunction similar to getKind().
>>>>>>>
>>>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>>>>>> Python, why is it not used in your example in FLIP-58. You could you
>>>>>>> extend the example to show how to specify these attributes in the
>> FLIP?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1]
>>>> https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>>>>>
>>>>>>> On 02.09.19 15:35, jincheng sun wrote:
>>>>>>>> Hi Timo,
>>>>>>>>
>>>>>>>> Great thanks for your feedback. I would like to share my thoughts
>> with
>>>>>>> you
>>>>>>>> inline. :)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jincheng
>>>>>>>>
>>>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> the FLIP looks awesome. However, I would like to discuss the
>> changes
>>>> to
>>>>>>>>> the user-facing parts again. Some feedback:
>>>>>>>>>
>>>>>>>>> 1. DataViews: With the current non-annotation design for DataViews,
>>>> we
>>>>>>>>> cannot perform eager state declaration, right? At which point
>> during
>>>>>>>>> execution do we know which state is required by the function? We
>>>> need to
>>>>>>>>> instantiate the function first, right?
>>>>>>>>>
>>>>>>>>>> We will analysis the Python AggregateFunction and extract the
>>>> DataViews
>>>>>>>> used in the Python AggregateFunction. This can be done
>>>>>>>> by instantiate a Python AggregateFunction, creating an accumulator
>> by
>>>>>>>> calling method create_accumulator and then analysis the created
>>>>>>>> accumulator. This is actually similar to the way that Java
>>>>>>>> AggregateFunction processing codegen logic. The extracted DataViews
>>>> can
>>>>>>>> then be used to construct the StateDescriptors in the operator,
>> i.e.,
>>>> we
>>>>>>>> should have hold the state spec and the state descriptor id in Java
>>>>>>>> operator and Python worker can access the state by specifying the
>>>>>>>> corresponding state descriptor id.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> 2. Serializability of functions: How do we ensure serializability
>> of
>>>>>>>>> functions for catalog persistence? In the Scala/Java API, we would
>>>> like
>>>>>>>>> to register classes instead of instances soon. This is the only way
>>>> to
>>>>>>>>> store a function properly in a catalog or we need some
>>>>>>>>> serialization/deserialization logic in the function interfaces to
>>>>>>>>> convert an instance to string properties.
>>>>>>>>>
>>>>>>>>>> The Python function will be serialized with CloudPickle anyway in
>>>> the
>>>>>>>> Python API as we need to transfer it to the Python worker which can
>>>> then
>>>>>>>> deserialize it for execution. The serialized Python function can be
>>>>>>> stored
>>>>>>>> into catalog.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> 3. TableEnvironment: What is the signature of
>>>> `register_function(self,
>>>>>>>>> name, function)`? Does it accept both a class and function? Like
>>>> `class
>>>>>>>>> Sum` and `def split()`? Could you add some examples for registering
>>>> both
>>>>>>>>> kinds of functions?
>>>>>>>>>
>>>>>>>>>> It has been already supported which you mentioned. You can find an
>>>>>>>> example in the POC code:
>>>>>>>>
>>>>>>>
>>>>
>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>>>>>
>>>>>>>>
>>>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>>>>>> function definition. It is the highest interface for both
>>>> user-defined
>>>>>>>>> and built-in functions. I'm not sure if getLanguage() should be
>> part
>>>> of
>>>>>>>>> this interface or one-level down which would be
>>>> `UserDefinedFunction`.
>>>>>>>>> Built-in functions will never be implemented in a different
>>>> language. In
>>>>>>>>> any case, I would vote for removing the UNKNOWN language, because
>> it
>>>>>>>>> does not solve anything. Why should a user declare a function that
>>>> the
>>>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
>>>> Scala
>>>>>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>>>>>
>>>>>>>>>> Actually we may have built-in Python functions in the future.
>>>> Regarding
>>>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there is
>>>>>>>> built-in Python
>>>>>>>> funciton for '+' operator, then we don't need to mix using Java and
>>>>>>> Python
>>>>>>>> UDFs. In this way, we can improve the execution performance.
>>>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
>> to
>>>> me.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> 5. Function characteristics: In the current design, function
>> classes
>>>> do
>>>>>>>>> not extend from any upper class. How can users declare
>>>> characteristics
>>>>>>>>> that are present in `FunctionDefinition` like determinism,
>>>> requirements,
>>>>>>>>> or soon also monotonism.
>>>>>>>>>
>>>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
>>>> class
>>>>>>>> for all user-defined functions.
>>>>>>>> We can define the deterministic, requirements, etc in this class.
>>>>>>>> Currently, we have already supported to define the deterministic.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>>>>>> I am assuming the proposed python UDX can also be applied to Flink
>>>> SQL.
>>>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
>> "Flink
>>>>>>>>> Python
>>>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function for
>>>>>>> Table".
>>>>>>>>>> Regards,
>>>>>>>>>> Shaoxuan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>>>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the feedback Bowen!
>>>>>>>>>>>
>>>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>>>>>
>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>
>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
>>>> during
>>>>>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Bowen,
>>>>>>>>>>>>
>>>>>>>>>>>> Very appreciated for your comments. I have replied you in the
>>>> design
>>>>>>>>> doc.
>>>>>>>>>>>> As it seems that the comments doesn't affect the overall design,
>>>> I'll
>>>>>>>>> not
>>>>>>>>>>>> cancel the vote for now and we can continue the discussion in
>> the
>>>>>>>>> design
>>>>>>>>>>>> doc.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>>
>>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>> <
>>>>>>>>>>>>
>>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
>>>> proposal,
>>>>>>>>>>> LGTM
>>>>>>>>>>>> in
>>>>>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Bowen
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <[hidden email]
>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks! It works.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <[hidden email]>
>>>> 写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
>> Definitely
>>>>>>> need
>>>>>>>>>>>> it!
>>>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
>>>> Dian
>>>>>>> Fu
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <[hidden email]
>>>
>>>> 写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
>> FLIP!
>>>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
>> help
>>>> you
>>>>>>>>>>>>>> complete
>>>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - First I'll give your account write permission for
>>>> confluence.
>>>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
>>>> Template
>>>>>>>>>>> [1],
>>>>>>>>>>>>>>>> (It's
>>>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
>> the
>>>>>>> VOTE
>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if you
>>>>>>> want!
>>>>>>>>> )
>>>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free to
>>>> tell
>>>>>>> me
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
>> 上午11:54写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature. I'm
>>>>>>>>> willing
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
>> created
>>>> a
>>>>>>>>> FLIP
>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
>>>> [hidden email]>
>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>>>>>> suggestions
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
>>>> create a
>>>>>>>>>>> FLIP
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
>>>> reminder
>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
>>>> bundle
>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel free
>> to
>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
>> 上午10:08写道:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
>>>> checkpoint.
>>>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
>> more
>>>>>>> about
>>>>>>>>>>>> it,
>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
>> checkpoint
>>>> and
>>>>>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
>> 写道:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
>> very
>>>>>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy to
>>>>>>>>>>>> understand
>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is the
>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>>>>>> is critically important for performance that multiple
>>>>>>>>>>> elements
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in the
>>>>>>> Flink
>>>>>>>>>>>>>> runner
>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
>>>> streaming,
>>>>>>> you
>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>>>>>> watermarks
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support) has
>>>>>>>>> already
>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
>> we'd
>>>>>>> like
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
>> Python
>>>>>>> Table
>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
>> offline
>>>>>>> and
>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution architecture.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous discussion
>>>>>>>>>>>> thread[2],
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache Beam
>> in
>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
>>>> structures
>>>>>>> and
>>>>>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
>> This
>>>>>>>>>>> design
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce how
>> to
>>>>>>> make
>>>>>>>>>>>> use
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>>>>>> execution:
>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
>>>> logging,
>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
>>>> portability
>>>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
>> the
>>>>>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's portability
>>>>>>>>>>>> framework,
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
>>>> ease of
>>>>>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>>>>>> [3] https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

jincheng sun
Hi,

Sure, for ensure the 1.10 relesae of flink, let's split the FLIPs, and
FLIP-58 only do the stateless part.

Cheers,
Jincheng

Aljoscha Krettek <[hidden email]> 于2019年9月6日周五 下午5:53写道:

> Hi,
>
> Regarding stateful functions and MapView/DataView/ListView: I think it’s
> best to keep that for a later FLIP and focus on a more basic version.
> Supporting stateful functions, especially with MapView can potentially be
> very slow so we have to see what we can do there.
>
> For the method names, I don’t know. If FLIP-64 passes they have to be
> changed. So we could use the final names right away, but I’m also fine with
> using the old method names for now.
>
> Best,
> Aljoscha
>
> > On 5. Sep 2019, at 12:40, jincheng sun <[hidden email]> wrote:
> >
> > Hi Aljoscha,
> >
> > Thanks for your comments!
> >
> > Regarding to the FLIP scope, it seems that we have agreed on the design
> of
> > the stateless function support.
> > What do you think about starting the development of the stateless
> function
> > support firstly and continue the discussion of stateful function support?
> > Or you think we should split the current FLIP into two FLIPs and discuss
> > the stateful function support in another thread?
> >
> > Currently, the Python DataView/MapView/ListView interfaces design follow
> > the Java/Scala naming conversions.
> > Of couse, We can continue to discuss whether there are better solutions,
> > i.e. using annotations.
> >
> > Regarding to the magic logic to support DataView/MapView/ListView, it
> will
> > be done by the framework and is transparent for users.
> > Per my understanding, the magic logic is unavoidable no matter what the
> > interfaces will be.
> >
> > Regarding to the catalog support of python function:1) If it's stored in
> > memory as temporary object, just as you said, users can call
> > TableEnvironment.register_function(will change to
> > register_temporary_function in FLIP-64)
> > 2) If it's persisted in external storage, users can call
> > Catalog.create_function. There will be no API change per my
> understanding.
> >
> > What do you think?
> > Best,Jincheng
> >
> > Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:
> >
> >> Hi,
> >>
> >> Another thing to consider is the Scope of the FLIP. Currently, we try to
> >> support (stateful) AggregateFunctions. I have some concerns about
> whether
> >> or not DataView/MapView/ListView is a good interface because it requires
> >> quite some magic from the runners to make it work, such as messing with
> the
> >> TypeInformation and injecting objects at runtime. If the FLIP aims for
> the
> >> minimum of ScalarFunctions and the whole execution harness, that should
> be
> >> easier to agree on.
> >>
> >> Another point is the naming of the new methods. I think Timo hinted at
> the
> >> fact that we have to consider catalog support for functions. There is
> >> ongoing work about differentiating between temporary objects and objects
> >> that are stored in a catalog (FLIP-64 [1]). With this in mind, the
> method
> >> for registering functions should be called register_temporary_function()
> >> and so on. Unless we want to already think about mixing Python and Java
> >> functions in the catalog, which is outside the scope of this FLIP, I
> think.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> >>
> >>
> >>> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]>
> wrote:
> >>>
> >>> Hi Aljoscha,
> >>>
> >>> That's a good points, so far, most of the code will live in
> flink-python
> >>> module, and the rules and relNodes will be put into the both blink and
> >>> flink planner modules, some of the common interface of required by
> >> planners
> >>> will be placed in flink-table-common. I think you are right, we should
> >> try
> >>> to ensure the changes of this feature is minimal.  For more detail we
> >> would
> >>> follow this principle when review the PRs.
> >>>
> >>> Great thanks for your questions and remind!
> >>>
> >>> Best,
> >>> Jincheng
> >>>
> >>>
> >>> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
> >>>
> >>>> Hi,
> >>>>
> >>>> Things looks interesting so far!
> >>>>
> >>>> I had one question: Where will most of the support code for this live?
> >>>> Will this add the required code to flink-table-common or the different
> >>>> runners? Can we implement this in such a way that only a minimal
> amount
> >> of
> >>>> support code is required in the parts of the Table API (and Table API
> >>>> runners) that  are not python specific?
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
> >>>>>
> >>>>> Hi Jincheng,
> >>>>>
> >>>>> 2. Serializability of functions: "#2 is very convenient for users"
> >> means
> >>>> only until they have the first backwards-compatibility issue, after
> that
> >>>> they will find it not so convinient anymore and will ask why the
> >> framework
> >>>> allowed storing such objects in a persistent storage. I don't want to
> be
> >>>> picky about it, but wanted to raise awareness that sometimes it is ok
> to
> >>>> limit use cases to guide users for devloping backwards-compatible
> >> programs.
> >>>>>
> >>>>> Thanks for the explanation fo the remaining items. It sounds
> reasonable
> >>>> to me. Regarding the example with `getKind()`, I actually meant
> >>>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't
> allow
> >>>> users to override this property. And I think we should do something
> >> similar
> >>>> for the getLanguage property.
> >>>>>
> >>>>> Thanks,
> >>>>> Timo
> >>>>>
> >>>>> On 03.09.19 15:01, jincheng sun wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> Thanks for the quick reply ! :)
> >>>>>> I have added more example for #3 and #5 to the FLIP. That are great
> >>>>>> suggestions !
> >>>>>>
> >>>>>> Regarding 2:
> >>>>>>
> >>>>>> There are two kind Serialization for CloudPickle(Which is different
> >> from
> >>>>>> Java):
> >>>>>> 1) For class and function which can be imported, CloudPickle only
> >>>>>> serialize the full path of the class and function (just like java
> >> class
> >>>>>> name).
> >>>>>> 2) For the class and function which can not be imported, CloudPickle
> >>>> will
> >>>>>> serialize the full content of the class and function.
> >>>>>> For #2, It means that we can not just store the full path of the
> class
> >>>> and
> >>>>>> function.
> >>>>>>
> >>>>>> The above serialization is recursive.
> >>>>>>
> >>>>>> However, there is indeed an problem of backwards compatibility when
> >> the
> >>>>>> module path of the parent class changed. But I think this is an rare
> >>>> case
> >>>>>> and acceptable. i.e., For Flink framework we never change the user
> >>>>>> interface module path if we want to keep backwards compatibility.
> For
> >>>> user
> >>>>>> code, if they change the interface of UDF's parent, they should
> >>>> re-register
> >>>>>> their functions.
> >>>>>>
> >>>>>> If we do not want support #2, we can store the full path of class
> and
> >>>>>> function, in that case we have no backwards compatibility problem.
> >> But I
> >>>>>> think the #2 is very convenient for users.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Regarding 4:
> >>>>>> As I mentioned earlier, there may be built-in Python functions and I
> >>>> think
> >>>>>> language is a "function" concept. Function and Language are
> orthogonal
> >>>>>> concepts.
> >>>>>> We may have R, GO and other language functions in the future, not
> only
> >>>>>> user-defined, but also built-in functions.
> >>>>>>
> >>>>>> You are right that users will not set this method and for Python
> >>>> functions,
> >>>>>> it will be set in the code-generated Java function by the framework.
> >>>> So, I
> >>>>>> think we should declare the getLanguage() in FunctionDefinition for
> >> now.
> >>>>>> (I'm not pretty sure what do you mean by saying that getKind() is
> >> final
> >>>> in
> >>>>>> UserDefinedFunction?)
> >>>>>>
> >>>>>> Best,
> >>>>>> Jincheng
> >>>>>>
> >>>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
> >>>>>>
> >>>>>>> Hi Jincheng,
> >>>>>>>
> >>>>>>> thanks for your response.
> >>>>>>>
> >>>>>>> 2. Serializability of functions: Using some arbitrary serialization
> >>>>>>> format for shipping a function to worker sounds fine to me. But
> once
> >> we
> >>>>>>> store functions a the catalog we need to think about backwards
> >>>>>>> compatibility and evolution of interfaces etc. I'm not sure if
> >>>>>>> CloudPickle is the right long-term storage format for this. If we
> >> don't
> >>>>>>> think about this in advance, we are basically violating our code
> >>>> quality
> >>>>>>> guide [1] of never use Java Serialization but in the Python-way. We
> >> are
> >>>>>>> using the RPC serialization for persistence.
> >>>>>>>
> >>>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
> >> API
> >>>>>>> code like the following is not covered there:
> >>>>>>>
> >>>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> >>>>>>> DataTypes.BIGINT(),
> >>>>>>>                                            DataTypes.BIGINT()))
> >>>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> >>>>>>> DataTypes.BIGINT(),
> >>>>>>> DataTypes.BIGINT()))
> >>>>>>> self.t_env.register_function("add", add)
> >>>>>>>
> >>>>>>> 4. FunctionDefinition: Your response still doesn't answer my
> question
> >>>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this
> is
> >> a
> >>>>>>> "user-defined function" concept and not a "function" concept. In
> any
> >>>>>>> case, all users should not be able to set this method. So it must
> be
> >>>>>>> final in UserDefinedFunction similar to getKind().
> >>>>>>>
> >>>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
> >>>>>>> Python, why is it not used in your example in FLIP-58. You could
> you
> >>>>>>> extend the example to show how to specify these attributes in the
> >> FLIP?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1]
> >>>>
> https://flink.apache.org/contributing/code-style-and-quality-java.html
> >>>>>>>
> >>>>>>> On 02.09.19 15:35, jincheng sun wrote:
> >>>>>>>> Hi Timo,
> >>>>>>>>
> >>>>>>>> Great thanks for your feedback. I would like to share my thoughts
> >> with
> >>>>>>> you
> >>>>>>>> inline. :)
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Jincheng
> >>>>>>>>
> >>>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> the FLIP looks awesome. However, I would like to discuss the
> >> changes
> >>>> to
> >>>>>>>>> the user-facing parts again. Some feedback:
> >>>>>>>>>
> >>>>>>>>> 1. DataViews: With the current non-annotation design for
> DataViews,
> >>>> we
> >>>>>>>>> cannot perform eager state declaration, right? At which point
> >> during
> >>>>>>>>> execution do we know which state is required by the function? We
> >>>> need to
> >>>>>>>>> instantiate the function first, right?
> >>>>>>>>>
> >>>>>>>>>> We will analysis the Python AggregateFunction and extract the
> >>>> DataViews
> >>>>>>>> used in the Python AggregateFunction. This can be done
> >>>>>>>> by instantiate a Python AggregateFunction, creating an accumulator
> >> by
> >>>>>>>> calling method create_accumulator and then analysis the created
> >>>>>>>> accumulator. This is actually similar to the way that Java
> >>>>>>>> AggregateFunction processing codegen logic. The extracted
> DataViews
> >>>> can
> >>>>>>>> then be used to construct the StateDescriptors in the operator,
> >> i.e.,
> >>>> we
> >>>>>>>> should have hold the state spec and the state descriptor id in
> Java
> >>>>>>>> operator and Python worker can access the state by specifying the
> >>>>>>>> corresponding state descriptor id.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 2. Serializability of functions: How do we ensure serializability
> >> of
> >>>>>>>>> functions for catalog persistence? In the Scala/Java API, we
> would
> >>>> like
> >>>>>>>>> to register classes instead of instances soon. This is the only
> way
> >>>> to
> >>>>>>>>> store a function properly in a catalog or we need some
> >>>>>>>>> serialization/deserialization logic in the function interfaces to
> >>>>>>>>> convert an instance to string properties.
> >>>>>>>>>
> >>>>>>>>>> The Python function will be serialized with CloudPickle anyway
> in
> >>>> the
> >>>>>>>> Python API as we need to transfer it to the Python worker which
> can
> >>>> then
> >>>>>>>> deserialize it for execution. The serialized Python function can
> be
> >>>>>>> stored
> >>>>>>>> into catalog.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 3. TableEnvironment: What is the signature of
> >>>> `register_function(self,
> >>>>>>>>> name, function)`? Does it accept both a class and function? Like
> >>>> `class
> >>>>>>>>> Sum` and `def split()`? Could you add some examples for
> registering
> >>>> both
> >>>>>>>>> kinds of functions?
> >>>>>>>>>
> >>>>>>>>>> It has been already supported which you mentioned. You can find
> an
> >>>>>>>> example in the POC code:
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
> >>>>>>>>> function definition. It is the highest interface for both
> >>>> user-defined
> >>>>>>>>> and built-in functions. I'm not sure if getLanguage() should be
> >> part
> >>>> of
> >>>>>>>>> this interface or one-level down which would be
> >>>> `UserDefinedFunction`.
> >>>>>>>>> Built-in functions will never be implemented in a different
> >>>> language. In
> >>>>>>>>> any case, I would vote for removing the UNKNOWN language, because
> >> it
> >>>>>>>>> does not solve anything. Why should a user declare a function
> that
> >>>> the
> >>>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
> >>>> Scala
> >>>>>>>>> users. How about `FunctionLanguage.JVM` instead?
> >>>>>>>>>
> >>>>>>>>>> Actually we may have built-in Python functions in the future.
> >>>> Regarding
> >>>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there
> is
> >>>>>>>> built-in Python
> >>>>>>>> funciton for '+' operator, then we don't need to mix using Java
> and
> >>>>>>> Python
> >>>>>>>> UDFs. In this way, we can improve the execution performance.
> >>>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
> >>>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
> >> to
> >>>> me.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> 5. Function characteristics: In the current design, function
> >> classes
> >>>> do
> >>>>>>>>> not extend from any upper class. How can users declare
> >>>> characteristics
> >>>>>>>>> that are present in `FunctionDefinition` like determinism,
> >>>> requirements,
> >>>>>>>>> or soon also monotonism.
> >>>>>>>>>
> >>>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
> >>>> class
> >>>>>>>> for all user-defined functions.
> >>>>>>>> We can define the deterministic, requirements, etc in this class.
> >>>>>>>> Currently, we have already supported to define the deterministic.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
> >>>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
> >>>>>>>>>> I am assuming the proposed python UDX can also be applied to
> Flink
> >>>> SQL.
> >>>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
> >> "Flink
> >>>>>>>>> Python
> >>>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function
> for
> >>>>>>> Table".
> >>>>>>>>>> Regards,
> >>>>>>>>>> Shaoxuan
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
> >>>>>>> [hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the feedback Bowen!
> >>>>>>>>>>>
> >>>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>>>>>>>>>>
> >>>>>>>>>>> Best, Jincheng
> >>>>>>>>>>>
> >>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
> >>>> during
> >>>>>>>>>>>> creating the FLIP @Jincheng.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Bowen,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Very appreciated for your comments. I have replied you in the
> >>>> design
> >>>>>>>>> doc.
> >>>>>>>>>>>> As it seems that the comments doesn't affect the overall
> design,
> >>>> I'll
> >>>>>>>>> not
> >>>>>>>>>>>> cancel the vote for now and we can continue the discussion in
> >> the
> >>>>>>>>> design
> >>>>>>>>>>>> doc.
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>>>>>> <
> >>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Dian
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Jincheng and Dian,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
> >>>> proposal,
> >>>>>>>>>>> LGTM
> >>>>>>>>>>>> in
> >>>>>>>>>>>>> general, and I left only a couple comments.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Bowen
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <
> [hidden email]
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks! It works.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <
> [hidden email]>
> >>>> 写道:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
> >> Definitely
> >>>>>>> need
> >>>>>>>>>>>> it!
> >>>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
> >>>> Dian
> >>>>>>> Fu
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <
> [hidden email]
> >>>
> >>>> 写道:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
> >> FLIP!
> >>>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
> >> help
> >>>> you
> >>>>>>>>>>>>>> complete
> >>>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> - First I'll give your account write permission for
> >>>> confluence.
> >>>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
> >>>> Template
> >>>>>>>>>>> [1],
> >>>>>>>>>>>>>>>> (It's
> >>>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
> >>>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
> >> the
> >>>>>>> VOTE
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if
> you
> >>>>>>> want!
> >>>>>>>>> )
> >>>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free
> to
> >>>> tell
> >>>>>>> me
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can solve them together. :)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
> >>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >>>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
> >> 上午11:54写道:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> +1 for starting the vote.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best, Hequn
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
> >>>>>>> [hidden email]>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature.
> I'm
> >>>>>>>>> willing
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
> >> created
> >>>> a
> >>>>>>>>> FLIP
> >>>>>>>>>>>>>>>> before,
> >>>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
> >>>> [hidden email]>
> >>>>>>>>>>> 写道:
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
> >>>>>>>>> suggestions
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
> >>>> create a
> >>>>>>>>>>> FLIP
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
> >>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best, Jincheng
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
> >>>>>>>>>>> 上午12:54写道:
> >>>>>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
> >>>> reminder
> >>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>> bundle
> >>>>>>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
> >>>> bundle
> >>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel
> free
> >> to
> >>>>>>>>> leave
> >>>>>>>>>>>>>>>>>>> comments if
> >>>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
> >> 上午10:08写道:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
> >>>>>>>>>>>> "Checkpoint"[1]
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
> >>>> checkpoint.
> >>>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
> >> more
> >>>>>>> about
> >>>>>>>>>>>> it,
> >>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
> >> checkpoint
> >>>> and
> >>>>>>>>>>>>>>>>>> watermark,
> >>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>> Dian
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
> >> 写道:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
> >> very
> >>>>>>>>>>>> detailed,
> >>>>>>>>>>>>>>>>>>>>>> thorough
> >>>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy
> to
> >>>>>>>>>>>> understand
> >>>>>>>>>>>>>> :)
> >>>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is
> the
> >>>>>>> bundle
> >>>>>>>>>>>>>>>>>>>>>> processing. It
> >>>>>>>>>>>>>>>>>>>>>>> is critically important for performance that
> multiple
> >>>>>>>>>>> elements
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in
> the
> >>>>>>> Flink
> >>>>>>>>>>>>>> runner
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> 1s or
> >>>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
> >>>> streaming,
> >>>>>>> you
> >>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> find
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
> >>>>>>>>>>> watermarks
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> checkpointing here:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
> >>>>>>>>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
> >>>>>>>>>>>>>>>>>>> [hidden email]>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support)
> has
> >>>>>>>>> already
> >>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
> >>>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
> >> we'd
> >>>>>>> like
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
> >> Python
> >>>>>>> Table
> >>>>>>>>>>>> API.
> >>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
> >> offline
> >>>>>>> and
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> drafted a
> >>>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
> >>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution
> architecture.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous
> discussion
> >>>>>>>>>>>> thread[2],
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache
> Beam
> >> in
> >>>>>>>>>>> latest
> >>>>>>>>>>>>>>>>>>>>>> releases. It
> >>>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
> >>>> structures
> >>>>>>> and
> >>>>>>>>>>>>>>>>>> protocols
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
> >> This
> >>>>>>>>>>> design
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce
> how
> >> to
> >>>>>>> make
> >>>>>>>>>>>> use
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> Beam's
> >>>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
> >>>>>>> execution:
> >>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
> >>>> logging,
> >>>>>>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
> >>>> portability
> >>>>>>>>>>>>>> framework
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
> >> the
> >>>>>>>>>>>>>>>> contributors
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's
> portability
> >>>>>>>>>>>> framework,
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
> >>>> ease of
> >>>>>>>>>>>>>>>>>>>>>> understanding of
> >>>>>>>>>>>>>>>>>>>>>>>> the design.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>> Jincheng
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
> >>>>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
> >>>>>>>>>>>>>>>>>>>>>>>> [3]
> https://github.com/dianfu/flink/commits/udf_poc
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Dian Fu-2
Hi all,

Thanks a lot for the discussion here. It makes sense to limit the scope of this FLIP to only ScalarFunction. I'll update the FLIP and remove the content relating to UDAF.

Thanks,
Dian

> 在 2019年9月6日,下午6:02,jincheng sun <[hidden email]> 写道:
>
> Hi,
>
> Sure, for ensure the 1.10 relesae of flink, let's split the FLIPs, and
> FLIP-58 only do the stateless part.
>
> Cheers,
> Jincheng
>
> Aljoscha Krettek <[hidden email]> 于2019年9月6日周五 下午5:53写道:
>
>> Hi,
>>
>> Regarding stateful functions and MapView/DataView/ListView: I think it’s
>> best to keep that for a later FLIP and focus on a more basic version.
>> Supporting stateful functions, especially with MapView can potentially be
>> very slow so we have to see what we can do there.
>>
>> For the method names, I don’t know. If FLIP-64 passes they have to be
>> changed. So we could use the final names right away, but I’m also fine with
>> using the old method names for now.
>>
>> Best,
>> Aljoscha
>>
>>> On 5. Sep 2019, at 12:40, jincheng sun <[hidden email]> wrote:
>>>
>>> Hi Aljoscha,
>>>
>>> Thanks for your comments!
>>>
>>> Regarding to the FLIP scope, it seems that we have agreed on the design
>> of
>>> the stateless function support.
>>> What do you think about starting the development of the stateless
>> function
>>> support firstly and continue the discussion of stateful function support?
>>> Or you think we should split the current FLIP into two FLIPs and discuss
>>> the stateful function support in another thread?
>>>
>>> Currently, the Python DataView/MapView/ListView interfaces design follow
>>> the Java/Scala naming conversions.
>>> Of couse, We can continue to discuss whether there are better solutions,
>>> i.e. using annotations.
>>>
>>> Regarding to the magic logic to support DataView/MapView/ListView, it
>> will
>>> be done by the framework and is transparent for users.
>>> Per my understanding, the magic logic is unavoidable no matter what the
>>> interfaces will be.
>>>
>>> Regarding to the catalog support of python function:1) If it's stored in
>>> memory as temporary object, just as you said, users can call
>>> TableEnvironment.register_function(will change to
>>> register_temporary_function in FLIP-64)
>>> 2) If it's persisted in external storage, users can call
>>> Catalog.create_function. There will be no API change per my
>> understanding.
>>>
>>> What do you think?
>>> Best,Jincheng
>>>
>>> Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:
>>>
>>>> Hi,
>>>>
>>>> Another thing to consider is the Scope of the FLIP. Currently, we try to
>>>> support (stateful) AggregateFunctions. I have some concerns about
>> whether
>>>> or not DataView/MapView/ListView is a good interface because it requires
>>>> quite some magic from the runners to make it work, such as messing with
>> the
>>>> TypeInformation and injecting objects at runtime. If the FLIP aims for
>> the
>>>> minimum of ScalarFunctions and the whole execution harness, that should
>> be
>>>> easier to agree on.
>>>>
>>>> Another point is the naming of the new methods. I think Timo hinted at
>> the
>>>> fact that we have to consider catalog support for functions. There is
>>>> ongoing work about differentiating between temporary objects and objects
>>>> that are stored in a catalog (FLIP-64 [1]). With this in mind, the
>> method
>>>> for registering functions should be called register_temporary_function()
>>>> and so on. Unless we want to already think about mixing Python and Java
>>>> functions in the catalog, which is outside the scope of this FLIP, I
>> think.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> [1]
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>>>>
>>>>
>>>>> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]>
>> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> That's a good points, so far, most of the code will live in
>> flink-python
>>>>> module, and the rules and relNodes will be put into the both blink and
>>>>> flink planner modules, some of the common interface of required by
>>>> planners
>>>>> will be placed in flink-table-common. I think you are right, we should
>>>> try
>>>>> to ensure the changes of this feature is minimal.  For more detail we
>>>> would
>>>>> follow this principle when review the PRs.
>>>>>
>>>>> Great thanks for your questions and remind!
>>>>>
>>>>> Best,
>>>>> Jincheng
>>>>>
>>>>>
>>>>> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Things looks interesting so far!
>>>>>>
>>>>>> I had one question: Where will most of the support code for this live?
>>>>>> Will this add the required code to flink-table-common or the different
>>>>>> runners? Can we implement this in such a way that only a minimal
>> amount
>>>> of
>>>>>> support code is required in the parts of the Table API (and Table API
>>>>>> runners) that  are not python specific?
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Jincheng,
>>>>>>>
>>>>>>> 2. Serializability of functions: "#2 is very convenient for users"
>>>> means
>>>>>> only until they have the first backwards-compatibility issue, after
>> that
>>>>>> they will find it not so convinient anymore and will ask why the
>>>> framework
>>>>>> allowed storing such objects in a persistent storage. I don't want to
>> be
>>>>>> picky about it, but wanted to raise awareness that sometimes it is ok
>> to
>>>>>> limit use cases to guide users for devloping backwards-compatible
>>>> programs.
>>>>>>>
>>>>>>> Thanks for the explanation fo the remaining items. It sounds
>> reasonable
>>>>>> to me. Regarding the example with `getKind()`, I actually meant
>>>>>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't
>> allow
>>>>>> users to override this property. And I think we should do something
>>>> similar
>>>>>> for the getLanguage property.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Timo
>>>>>>>
>>>>>>> On 03.09.19 15:01, jincheng sun wrote:
>>>>>>>> Hi Timo,
>>>>>>>>
>>>>>>>> Thanks for the quick reply ! :)
>>>>>>>> I have added more example for #3 and #5 to the FLIP. That are great
>>>>>>>> suggestions !
>>>>>>>>
>>>>>>>> Regarding 2:
>>>>>>>>
>>>>>>>> There are two kind Serialization for CloudPickle(Which is different
>>>> from
>>>>>>>> Java):
>>>>>>>> 1) For class and function which can be imported, CloudPickle only
>>>>>>>> serialize the full path of the class and function (just like java
>>>> class
>>>>>>>> name).
>>>>>>>> 2) For the class and function which can not be imported, CloudPickle
>>>>>> will
>>>>>>>> serialize the full content of the class and function.
>>>>>>>> For #2, It means that we can not just store the full path of the
>> class
>>>>>> and
>>>>>>>> function.
>>>>>>>>
>>>>>>>> The above serialization is recursive.
>>>>>>>>
>>>>>>>> However, there is indeed an problem of backwards compatibility when
>>>> the
>>>>>>>> module path of the parent class changed. But I think this is an rare
>>>>>> case
>>>>>>>> and acceptable. i.e., For Flink framework we never change the user
>>>>>>>> interface module path if we want to keep backwards compatibility.
>> For
>>>>>> user
>>>>>>>> code, if they change the interface of UDF's parent, they should
>>>>>> re-register
>>>>>>>> their functions.
>>>>>>>>
>>>>>>>> If we do not want support #2, we can store the full path of class
>> and
>>>>>>>> function, in that case we have no backwards compatibility problem.
>>>> But I
>>>>>>>> think the #2 is very convenient for users.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Regarding 4:
>>>>>>>> As I mentioned earlier, there may be built-in Python functions and I
>>>>>> think
>>>>>>>> language is a "function" concept. Function and Language are
>> orthogonal
>>>>>>>> concepts.
>>>>>>>> We may have R, GO and other language functions in the future, not
>> only
>>>>>>>> user-defined, but also built-in functions.
>>>>>>>>
>>>>>>>> You are right that users will not set this method and for Python
>>>>>> functions,
>>>>>>>> it will be set in the code-generated Java function by the framework.
>>>>>> So, I
>>>>>>>> think we should declare the getLanguage() in FunctionDefinition for
>>>> now.
>>>>>>>> (I'm not pretty sure what do you mean by saying that getKind() is
>>>> final
>>>>>> in
>>>>>>>> UserDefinedFunction?)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jincheng
>>>>>>>>
>>>>>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>>>>>>>
>>>>>>>>> Hi Jincheng,
>>>>>>>>>
>>>>>>>>> thanks for your response.
>>>>>>>>>
>>>>>>>>> 2. Serializability of functions: Using some arbitrary serialization
>>>>>>>>> format for shipping a function to worker sounds fine to me. But
>> once
>>>> we
>>>>>>>>> store functions a the catalog we need to think about backwards
>>>>>>>>> compatibility and evolution of interfaces etc. I'm not sure if
>>>>>>>>> CloudPickle is the right long-term storage format for this. If we
>>>> don't
>>>>>>>>> think about this in advance, we are basically violating our code
>>>>>> quality
>>>>>>>>> guide [1] of never use Java Serialization but in the Python-way. We
>>>> are
>>>>>>>>> using the RPC serialization for persistence.
>>>>>>>>>
>>>>>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
>>>> API
>>>>>>>>> code like the following is not covered there:
>>>>>>>>>
>>>>>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>>                                           DataTypes.BIGINT()))
>>>>>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>> DataTypes.BIGINT()))
>>>>>>>>> self.t_env.register_function("add", add)
>>>>>>>>>
>>>>>>>>> 4. FunctionDefinition: Your response still doesn't answer my
>> question
>>>>>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this
>> is
>>>> a
>>>>>>>>> "user-defined function" concept and not a "function" concept. In
>> any
>>>>>>>>> case, all users should not be able to set this method. So it must
>> be
>>>>>>>>> final in UserDefinedFunction similar to getKind().
>>>>>>>>>
>>>>>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>>>>>>>> Python, why is it not used in your example in FLIP-58. You could
>> you
>>>>>>>>> extend the example to show how to specify these attributes in the
>>>> FLIP?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>
>> https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>>>>>>>
>>>>>>>>> On 02.09.19 15:35, jincheng sun wrote:
>>>>>>>>>> Hi Timo,
>>>>>>>>>>
>>>>>>>>>> Great thanks for your feedback. I would like to share my thoughts
>>>> with
>>>>>>>>> you
>>>>>>>>>> inline. :)
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jincheng
>>>>>>>>>>
>>>>>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> the FLIP looks awesome. However, I would like to discuss the
>>>> changes
>>>>>> to
>>>>>>>>>>> the user-facing parts again. Some feedback:
>>>>>>>>>>>
>>>>>>>>>>> 1. DataViews: With the current non-annotation design for
>> DataViews,
>>>>>> we
>>>>>>>>>>> cannot perform eager state declaration, right? At which point
>>>> during
>>>>>>>>>>> execution do we know which state is required by the function? We
>>>>>> need to
>>>>>>>>>>> instantiate the function first, right?
>>>>>>>>>>>
>>>>>>>>>>>> We will analysis the Python AggregateFunction and extract the
>>>>>> DataViews
>>>>>>>>>> used in the Python AggregateFunction. This can be done
>>>>>>>>>> by instantiate a Python AggregateFunction, creating an accumulator
>>>> by
>>>>>>>>>> calling method create_accumulator and then analysis the created
>>>>>>>>>> accumulator. This is actually similar to the way that Java
>>>>>>>>>> AggregateFunction processing codegen logic. The extracted
>> DataViews
>>>>>> can
>>>>>>>>>> then be used to construct the StateDescriptors in the operator,
>>>> i.e.,
>>>>>> we
>>>>>>>>>> should have hold the state spec and the state descriptor id in
>> Java
>>>>>>>>>> operator and Python worker can access the state by specifying the
>>>>>>>>>> corresponding state descriptor id.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 2. Serializability of functions: How do we ensure serializability
>>>> of
>>>>>>>>>>> functions for catalog persistence? In the Scala/Java API, we
>> would
>>>>>> like
>>>>>>>>>>> to register classes instead of instances soon. This is the only
>> way
>>>>>> to
>>>>>>>>>>> store a function properly in a catalog or we need some
>>>>>>>>>>> serialization/deserialization logic in the function interfaces to
>>>>>>>>>>> convert an instance to string properties.
>>>>>>>>>>>
>>>>>>>>>>>> The Python function will be serialized with CloudPickle anyway
>> in
>>>>>> the
>>>>>>>>>> Python API as we need to transfer it to the Python worker which
>> can
>>>>>> then
>>>>>>>>>> deserialize it for execution. The serialized Python function can
>> be
>>>>>>>>> stored
>>>>>>>>>> into catalog.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 3. TableEnvironment: What is the signature of
>>>>>> `register_function(self,
>>>>>>>>>>> name, function)`? Does it accept both a class and function? Like
>>>>>> `class
>>>>>>>>>>> Sum` and `def split()`? Could you add some examples for
>> registering
>>>>>> both
>>>>>>>>>>> kinds of functions?
>>>>>>>>>>>
>>>>>>>>>>>> It has been already supported which you mentioned. You can find
>> an
>>>>>>>>>> example in the POC code:
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>>>>>>>> function definition. It is the highest interface for both
>>>>>> user-defined
>>>>>>>>>>> and built-in functions. I'm not sure if getLanguage() should be
>>>> part
>>>>>> of
>>>>>>>>>>> this interface or one-level down which would be
>>>>>> `UserDefinedFunction`.
>>>>>>>>>>> Built-in functions will never be implemented in a different
>>>>>> language. In
>>>>>>>>>>> any case, I would vote for removing the UNKNOWN language, because
>>>> it
>>>>>>>>>>> does not solve anything. Why should a user declare a function
>> that
>>>>>> the
>>>>>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
>>>>>> Scala
>>>>>>>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>>>>>>>
>>>>>>>>>>>> Actually we may have built-in Python functions in the future.
>>>>>> Regarding
>>>>>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there
>> is
>>>>>>>>>> built-in Python
>>>>>>>>>> funciton for '+' operator, then we don't need to mix using Java
>> and
>>>>>>>>> Python
>>>>>>>>>> UDFs. In this way, we can improve the execution performance.
>>>>>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>>>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
>>>> to
>>>>>> me.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> 5. Function characteristics: In the current design, function
>>>> classes
>>>>>> do
>>>>>>>>>>> not extend from any upper class. How can users declare
>>>>>> characteristics
>>>>>>>>>>> that are present in `FunctionDefinition` like determinism,
>>>>>> requirements,
>>>>>>>>>>> or soon also monotonism.
>>>>>>>>>>>
>>>>>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
>>>>>> class
>>>>>>>>>> for all user-defined functions.
>>>>>>>>>> We can define the deterministic, requirements, etc in this class.
>>>>>>>>>> Currently, we have already supported to define the deterministic.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Timo
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>>>>>>>> I am assuming the proposed python UDX can also be applied to
>> Flink
>>>>>> SQL.
>>>>>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
>>>> "Flink
>>>>>>>>>>> Python
>>>>>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function
>> for
>>>>>>>>> Table".
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>>>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the feedback Bowen!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
>>>>>> during
>>>>>>>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Bowen,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Very appreciated for your comments. I have replied you in the
>>>>>> design
>>>>>>>>>>> doc.
>>>>>>>>>>>>>> As it seems that the comments doesn't affect the overall
>> design,
>>>>>> I'll
>>>>>>>>>>> not
>>>>>>>>>>>>>> cancel the vote for now and we can continue the discussion in
>>>> the
>>>>>>>>>>> design
>>>>>>>>>>>>>> doc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
>>>>>> proposal,
>>>>>>>>>>>>> LGTM
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Bowen
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <
>> [hidden email]
>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks! It works.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <
>> [hidden email]>
>>>>>> 写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
>>>> Definitely
>>>>>>>>> need
>>>>>>>>>>>>>> it!
>>>>>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
>>>>>> Dian
>>>>>>>>> Fu
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <
>> [hidden email]
>>>>>
>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
>>>> FLIP!
>>>>>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
>>>> help
>>>>>> you
>>>>>>>>>>>>>>>> complete
>>>>>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> - First I'll give your account write permission for
>>>>>> confluence.
>>>>>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
>>>>>> Template
>>>>>>>>>>>>> [1],
>>>>>>>>>>>>>>>>>> (It's
>>>>>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
>>>> the
>>>>>>>>> VOTE
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if
>> you
>>>>>>>>> want!
>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free
>> to
>>>>>> tell
>>>>>>>>> me
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
>>>> 上午11:54写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature.
>> I'm
>>>>>>>>>>> willing
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
>>>> created
>>>>>> a
>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
>>>>>> [hidden email]>
>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>>>>>>>> suggestions
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
>>>>>> create a
>>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
>>>>>> reminder
>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
>>>>>> bundle
>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel
>> free
>>>> to
>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
>>>> 上午10:08写道:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
>>>>>> checkpoint.
>>>>>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
>>>> more
>>>>>>>>> about
>>>>>>>>>>>>>> it,
>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
>>>> checkpoint
>>>>>> and
>>>>>>>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
>>>> very
>>>>>>>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy
>> to
>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is
>> the
>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>>>>>>>> is critically important for performance that
>> multiple
>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in
>> the
>>>>>>>>> Flink
>>>>>>>>>>>>>>>> runner
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
>>>>>> streaming,
>>>>>>>>> you
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>>>>>>>> watermarks
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support)
>> has
>>>>>>>>>>> already
>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
>>>> we'd
>>>>>>>>> like
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
>>>> Python
>>>>>>>>> Table
>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
>>>> offline
>>>>>>>>> and
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution
>> architecture.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous
>> discussion
>>>>>>>>>>>>>> thread[2],
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache
>> Beam
>>>> in
>>>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
>>>>>> structures
>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
>>>> This
>>>>>>>>>>>>> design
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce
>> how
>>>> to
>>>>>>>>> make
>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>>>>>>>> execution:
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
>>>>>> logging,
>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
>>>>>> portability
>>>>>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
>>>> the
>>>>>>>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's
>> portability
>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
>>>>>> ease of
>>>>>>>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>>>>>>>> [3]
>> https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Dian Fu-2
Hi all,

I have updated the FLIP and removed content relate to UDAF and also changed the title of the FLIP to "Flink Python User-Defined Stateless Function for Table". Does it make sense to you?

Regards,
Dian

> 在 2019年9月6日,下午6:09,Dian Fu <[hidden email]> 写道:
>
> Hi all,
>
> Thanks a lot for the discussion here. It makes sense to limit the scope of this FLIP to only ScalarFunction. I'll update the FLIP and remove the content relating to UDAF.
>
> Thanks,
> Dian
>
>> 在 2019年9月6日,下午6:02,jincheng sun <[hidden email]> 写道:
>>
>> Hi,
>>
>> Sure, for ensure the 1.10 relesae of flink, let's split the FLIPs, and
>> FLIP-58 only do the stateless part.
>>
>> Cheers,
>> Jincheng
>>
>> Aljoscha Krettek <[hidden email]> 于2019年9月6日周五 下午5:53写道:
>>
>>> Hi,
>>>
>>> Regarding stateful functions and MapView/DataView/ListView: I think it’s
>>> best to keep that for a later FLIP and focus on a more basic version.
>>> Supporting stateful functions, especially with MapView can potentially be
>>> very slow so we have to see what we can do there.
>>>
>>> For the method names, I don’t know. If FLIP-64 passes they have to be
>>> changed. So we could use the final names right away, but I’m also fine with
>>> using the old method names for now.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>> On 5. Sep 2019, at 12:40, jincheng sun <[hidden email]> wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> Thanks for your comments!
>>>>
>>>> Regarding to the FLIP scope, it seems that we have agreed on the design
>>> of
>>>> the stateless function support.
>>>> What do you think about starting the development of the stateless
>>> function
>>>> support firstly and continue the discussion of stateful function support?
>>>> Or you think we should split the current FLIP into two FLIPs and discuss
>>>> the stateful function support in another thread?
>>>>
>>>> Currently, the Python DataView/MapView/ListView interfaces design follow
>>>> the Java/Scala naming conversions.
>>>> Of couse, We can continue to discuss whether there are better solutions,
>>>> i.e. using annotations.
>>>>
>>>> Regarding to the magic logic to support DataView/MapView/ListView, it
>>> will
>>>> be done by the framework and is transparent for users.
>>>> Per my understanding, the magic logic is unavoidable no matter what the
>>>> interfaces will be.
>>>>
>>>> Regarding to the catalog support of python function:1) If it's stored in
>>>> memory as temporary object, just as you said, users can call
>>>> TableEnvironment.register_function(will change to
>>>> register_temporary_function in FLIP-64)
>>>> 2) If it's persisted in external storage, users can call
>>>> Catalog.create_function. There will be no API change per my
>>> understanding.
>>>>
>>>> What do you think?
>>>> Best,Jincheng
>>>>
>>>> Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Another thing to consider is the Scope of the FLIP. Currently, we try to
>>>>> support (stateful) AggregateFunctions. I have some concerns about
>>> whether
>>>>> or not DataView/MapView/ListView is a good interface because it requires
>>>>> quite some magic from the runners to make it work, such as messing with
>>> the
>>>>> TypeInformation and injecting objects at runtime. If the FLIP aims for
>>> the
>>>>> minimum of ScalarFunctions and the whole execution harness, that should
>>> be
>>>>> easier to agree on.
>>>>>
>>>>> Another point is the naming of the new methods. I think Timo hinted at
>>> the
>>>>> fact that we have to consider catalog support for functions. There is
>>>>> ongoing work about differentiating between temporary objects and objects
>>>>> that are stored in a catalog (FLIP-64 [1]). With this in mind, the
>>> method
>>>>> for registering functions should be called register_temporary_function()
>>>>> and so on. Unless we want to already think about mixing Python and Java
>>>>> functions in the catalog, which is outside the scope of this FLIP, I
>>> think.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> [1]
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>>>>>
>>>>>
>>>>>> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]>
>>> wrote:
>>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> That's a good points, so far, most of the code will live in
>>> flink-python
>>>>>> module, and the rules and relNodes will be put into the both blink and
>>>>>> flink planner modules, some of the common interface of required by
>>>>> planners
>>>>>> will be placed in flink-table-common. I think you are right, we should
>>>>> try
>>>>>> to ensure the changes of this feature is minimal.  For more detail we
>>>>> would
>>>>>> follow this principle when review the PRs.
>>>>>>
>>>>>> Great thanks for your questions and remind!
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>>
>>>>>> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Things looks interesting so far!
>>>>>>>
>>>>>>> I had one question: Where will most of the support code for this live?
>>>>>>> Will this add the required code to flink-table-common or the different
>>>>>>> runners? Can we implement this in such a way that only a minimal
>>> amount
>>>>> of
>>>>>>> support code is required in the parts of the Table API (and Table API
>>>>>>> runners) that  are not python specific?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hi Jincheng,
>>>>>>>>
>>>>>>>> 2. Serializability of functions: "#2 is very convenient for users"
>>>>> means
>>>>>>> only until they have the first backwards-compatibility issue, after
>>> that
>>>>>>> they will find it not so convinient anymore and will ask why the
>>>>> framework
>>>>>>> allowed storing such objects in a persistent storage. I don't want to
>>> be
>>>>>>> picky about it, but wanted to raise awareness that sometimes it is ok
>>> to
>>>>>>> limit use cases to guide users for devloping backwards-compatible
>>>>> programs.
>>>>>>>>
>>>>>>>> Thanks for the explanation fo the remaining items. It sounds
>>> reasonable
>>>>>>> to me. Regarding the example with `getKind()`, I actually meant
>>>>>>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't
>>> allow
>>>>>>> users to override this property. And I think we should do something
>>>>> similar
>>>>>>> for the getLanguage property.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>> On 03.09.19 15:01, jincheng sun wrote:
>>>>>>>>> Hi Timo,
>>>>>>>>>
>>>>>>>>> Thanks for the quick reply ! :)
>>>>>>>>> I have added more example for #3 and #5 to the FLIP. That are great
>>>>>>>>> suggestions !
>>>>>>>>>
>>>>>>>>> Regarding 2:
>>>>>>>>>
>>>>>>>>> There are two kind Serialization for CloudPickle(Which is different
>>>>> from
>>>>>>>>> Java):
>>>>>>>>> 1) For class and function which can be imported, CloudPickle only
>>>>>>>>> serialize the full path of the class and function (just like java
>>>>> class
>>>>>>>>> name).
>>>>>>>>> 2) For the class and function which can not be imported, CloudPickle
>>>>>>> will
>>>>>>>>> serialize the full content of the class and function.
>>>>>>>>> For #2, It means that we can not just store the full path of the
>>> class
>>>>>>> and
>>>>>>>>> function.
>>>>>>>>>
>>>>>>>>> The above serialization is recursive.
>>>>>>>>>
>>>>>>>>> However, there is indeed an problem of backwards compatibility when
>>>>> the
>>>>>>>>> module path of the parent class changed. But I think this is an rare
>>>>>>> case
>>>>>>>>> and acceptable. i.e., For Flink framework we never change the user
>>>>>>>>> interface module path if we want to keep backwards compatibility.
>>> For
>>>>>>> user
>>>>>>>>> code, if they change the interface of UDF's parent, they should
>>>>>>> re-register
>>>>>>>>> their functions.
>>>>>>>>>
>>>>>>>>> If we do not want support #2, we can store the full path of class
>>> and
>>>>>>>>> function, in that case we have no backwards compatibility problem.
>>>>> But I
>>>>>>>>> think the #2 is very convenient for users.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> Regarding 4:
>>>>>>>>> As I mentioned earlier, there may be built-in Python functions and I
>>>>>>> think
>>>>>>>>> language is a "function" concept. Function and Language are
>>> orthogonal
>>>>>>>>> concepts.
>>>>>>>>> We may have R, GO and other language functions in the future, not
>>> only
>>>>>>>>> user-defined, but also built-in functions.
>>>>>>>>>
>>>>>>>>> You are right that users will not set this method and for Python
>>>>>>> functions,
>>>>>>>>> it will be set in the code-generated Java function by the framework.
>>>>>>> So, I
>>>>>>>>> think we should declare the getLanguage() in FunctionDefinition for
>>>>> now.
>>>>>>>>> (I'm not pretty sure what do you mean by saying that getKind() is
>>>>> final
>>>>>>> in
>>>>>>>>> UserDefinedFunction?)
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jincheng
>>>>>>>>>
>>>>>>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>>>>>>>>
>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>
>>>>>>>>>> thanks for your response.
>>>>>>>>>>
>>>>>>>>>> 2. Serializability of functions: Using some arbitrary serialization
>>>>>>>>>> format for shipping a function to worker sounds fine to me. But
>>> once
>>>>> we
>>>>>>>>>> store functions a the catalog we need to think about backwards
>>>>>>>>>> compatibility and evolution of interfaces etc. I'm not sure if
>>>>>>>>>> CloudPickle is the right long-term storage format for this. If we
>>>>> don't
>>>>>>>>>> think about this in advance, we are basically violating our code
>>>>>>> quality
>>>>>>>>>> guide [1] of never use Java Serialization but in the Python-way. We
>>>>> are
>>>>>>>>>> using the RPC serialization for persistence.
>>>>>>>>>>
>>>>>>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
>>>>> API
>>>>>>>>>> code like the following is not covered there:
>>>>>>>>>>
>>>>>>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>>>                                          DataTypes.BIGINT()))
>>>>>>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>>> DataTypes.BIGINT()))
>>>>>>>>>> self.t_env.register_function("add", add)
>>>>>>>>>>
>>>>>>>>>> 4. FunctionDefinition: Your response still doesn't answer my
>>> question
>>>>>>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this
>>> is
>>>>> a
>>>>>>>>>> "user-defined function" concept and not a "function" concept. In
>>> any
>>>>>>>>>> case, all users should not be able to set this method. So it must
>>> be
>>>>>>>>>> final in UserDefinedFunction similar to getKind().
>>>>>>>>>>
>>>>>>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>>>>>>>>> Python, why is it not used in your example in FLIP-58. You could
>>> you
>>>>>>>>>> extend the example to show how to specify these attributes in the
>>>>> FLIP?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>
>>> https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>>>>>>>>
>>>>>>>>>> On 02.09.19 15:35, jincheng sun wrote:
>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>
>>>>>>>>>>> Great thanks for your feedback. I would like to share my thoughts
>>>>> with
>>>>>>>>>> you
>>>>>>>>>>> inline. :)
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jincheng
>>>>>>>>>>>
>>>>>>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> the FLIP looks awesome. However, I would like to discuss the
>>>>> changes
>>>>>>> to
>>>>>>>>>>>> the user-facing parts again. Some feedback:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. DataViews: With the current non-annotation design for
>>> DataViews,
>>>>>>> we
>>>>>>>>>>>> cannot perform eager state declaration, right? At which point
>>>>> during
>>>>>>>>>>>> execution do we know which state is required by the function? We
>>>>>>> need to
>>>>>>>>>>>> instantiate the function first, right?
>>>>>>>>>>>>
>>>>>>>>>>>>> We will analysis the Python AggregateFunction and extract the
>>>>>>> DataViews
>>>>>>>>>>> used in the Python AggregateFunction. This can be done
>>>>>>>>>>> by instantiate a Python AggregateFunction, creating an accumulator
>>>>> by
>>>>>>>>>>> calling method create_accumulator and then analysis the created
>>>>>>>>>>> accumulator. This is actually similar to the way that Java
>>>>>>>>>>> AggregateFunction processing codegen logic. The extracted
>>> DataViews
>>>>>>> can
>>>>>>>>>>> then be used to construct the StateDescriptors in the operator,
>>>>> i.e.,
>>>>>>> we
>>>>>>>>>>> should have hold the state spec and the state descriptor id in
>>> Java
>>>>>>>>>>> operator and Python worker can access the state by specifying the
>>>>>>>>>>> corresponding state descriptor id.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> 2. Serializability of functions: How do we ensure serializability
>>>>> of
>>>>>>>>>>>> functions for catalog persistence? In the Scala/Java API, we
>>> would
>>>>>>> like
>>>>>>>>>>>> to register classes instead of instances soon. This is the only
>>> way
>>>>>>> to
>>>>>>>>>>>> store a function properly in a catalog or we need some
>>>>>>>>>>>> serialization/deserialization logic in the function interfaces to
>>>>>>>>>>>> convert an instance to string properties.
>>>>>>>>>>>>
>>>>>>>>>>>>> The Python function will be serialized with CloudPickle anyway
>>> in
>>>>>>> the
>>>>>>>>>>> Python API as we need to transfer it to the Python worker which
>>> can
>>>>>>> then
>>>>>>>>>>> deserialize it for execution. The serialized Python function can
>>> be
>>>>>>>>>> stored
>>>>>>>>>>> into catalog.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> 3. TableEnvironment: What is the signature of
>>>>>>> `register_function(self,
>>>>>>>>>>>> name, function)`? Does it accept both a class and function? Like
>>>>>>> `class
>>>>>>>>>>>> Sum` and `def split()`? Could you add some examples for
>>> registering
>>>>>>> both
>>>>>>>>>>>> kinds of functions?
>>>>>>>>>>>>
>>>>>>>>>>>>> It has been already supported which you mentioned. You can find
>>> an
>>>>>>>>>>> example in the POC code:
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>>>>>>>>> function definition. It is the highest interface for both
>>>>>>> user-defined
>>>>>>>>>>>> and built-in functions. I'm not sure if getLanguage() should be
>>>>> part
>>>>>>> of
>>>>>>>>>>>> this interface or one-level down which would be
>>>>>>> `UserDefinedFunction`.
>>>>>>>>>>>> Built-in functions will never be implemented in a different
>>>>>>> language. In
>>>>>>>>>>>> any case, I would vote for removing the UNKNOWN language, because
>>>>> it
>>>>>>>>>>>> does not solve anything. Why should a user declare a function
>>> that
>>>>>>> the
>>>>>>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
>>>>>>> Scala
>>>>>>>>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>>>>>>>>
>>>>>>>>>>>>> Actually we may have built-in Python functions in the future.
>>>>>>> Regarding
>>>>>>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there
>>> is
>>>>>>>>>>> built-in Python
>>>>>>>>>>> funciton for '+' operator, then we don't need to mix using Java
>>> and
>>>>>>>>>> Python
>>>>>>>>>>> UDFs. In this way, we can improve the execution performance.
>>>>>>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>>>>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
>>>>> to
>>>>>>> me.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> 5. Function characteristics: In the current design, function
>>>>> classes
>>>>>>> do
>>>>>>>>>>>> not extend from any upper class. How can users declare
>>>>>>> characteristics
>>>>>>>>>>>> that are present in `FunctionDefinition` like determinism,
>>>>>>> requirements,
>>>>>>>>>>>> or soon also monotonism.
>>>>>>>>>>>>
>>>>>>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
>>>>>>> class
>>>>>>>>>>> for all user-defined functions.
>>>>>>>>>>> We can define the deterministic, requirements, etc in this class.
>>>>>>>>>>> Currently, we have already supported to define the deterministic.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>>>>>>>>> I am assuming the proposed python UDX can also be applied to
>>> Flink
>>>>>>> SQL.
>>>>>>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
>>>>> "Flink
>>>>>>>>>>>> Python
>>>>>>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function
>>> for
>>>>>>>>>> Table".
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the feedback Bowen!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
>>>>>>> during
>>>>>>>>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Bowen,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Very appreciated for your comments. I have replied you in the
>>>>>>> design
>>>>>>>>>>>> doc.
>>>>>>>>>>>>>>> As it seems that the comments doesn't affect the overall
>>> design,
>>>>>>> I'll
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> cancel the vote for now and we can continue the discussion in
>>>>> the
>>>>>>>>>>>> design
>>>>>>>>>>>>>>> doc.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
>>>>>>> proposal,
>>>>>>>>>>>>>> LGTM
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Bowen
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <
>>> [hidden email]
>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks! It works.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <
>>> [hidden email]>
>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
>>>>> Definitely
>>>>>>>>>> need
>>>>>>>>>>>>>>> it!
>>>>>>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
>>>>>>> Dian
>>>>>>>>>> Fu
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <
>>> [hidden email]
>>>>>>
>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
>>>>> FLIP!
>>>>>>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
>>>>> help
>>>>>>> you
>>>>>>>>>>>>>>>>> complete
>>>>>>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - First I'll give your account write permission for
>>>>>>> confluence.
>>>>>>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
>>>>>>> Template
>>>>>>>>>>>>>> [1],
>>>>>>>>>>>>>>>>>>> (It's
>>>>>>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
>>>>> the
>>>>>>>>>> VOTE
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if
>>> you
>>>>>>>>>> want!
>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free
>>> to
>>>>>>> tell
>>>>>>>>>> me
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
>>>>> 上午11:54写道:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature.
>>> I'm
>>>>>>>>>>>> willing
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
>>>>> created
>>>>>>> a
>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
>>>>>>> [hidden email]>
>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>>>>>>>>> suggestions
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
>>>>>>> create a
>>>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>>>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
>>>>>>> reminder
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
>>>>>>> bundle
>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel
>>> free
>>>>> to
>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
>>>>> 上午10:08写道:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
>>>>>>> checkpoint.
>>>>>>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
>>>>> more
>>>>>>>>>> about
>>>>>>>>>>>>>>> it,
>>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
>>>>> checkpoint
>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
>>>>> very
>>>>>>>>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy
>>> to
>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is
>>> the
>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>>>>>>>>> is critically important for performance that
>>> multiple
>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in
>>> the
>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>> runner
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
>>>>>>> streaming,
>>>>>>>>>> you
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>>>>>>>>> watermarks
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support)
>>> has
>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
>>>>> we'd
>>>>>>>>>> like
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
>>>>> Python
>>>>>>>>>> Table
>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
>>>>> offline
>>>>>>>>>> and
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution
>>> architecture.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous
>>> discussion
>>>>>>>>>>>>>>> thread[2],
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache
>>> Beam
>>>>> in
>>>>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
>>>>>>> structures
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
>>>>> This
>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce
>>> how
>>>>> to
>>>>>>>>>> make
>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>>>>>>>>> execution:
>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
>>>>>>> logging,
>>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
>>>>>>> portability
>>>>>>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
>>>>> the
>>>>>>>>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's
>>> portability
>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
>>>>>>> ease of
>>>>>>>>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]
>>> https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Python User-Defined Function for Table API

Aljoscha Krettek-2
Hi,

Thanks for the quick response! I think this looks good now and it should be something that everyone can agree on as a first step.

Best,
Aljoscha

> On 6. Sep 2019, at 12:22, Dian Fu <[hidden email]> wrote:
>
> Hi all,
>
> I have updated the FLIP and removed content relate to UDAF and also changed the title of the FLIP to "Flink Python User-Defined Stateless Function for Table". Does it make sense to you?
>
> Regards,
> Dian
>
>> 在 2019年9月6日,下午6:09,Dian Fu <[hidden email]> 写道:
>>
>> Hi all,
>>
>> Thanks a lot for the discussion here. It makes sense to limit the scope of this FLIP to only ScalarFunction. I'll update the FLIP and remove the content relating to UDAF.
>>
>> Thanks,
>> Dian
>>
>>> 在 2019年9月6日,下午6:02,jincheng sun <[hidden email]> 写道:
>>>
>>> Hi,
>>>
>>> Sure, for ensure the 1.10 relesae of flink, let's split the FLIPs, and
>>> FLIP-58 only do the stateless part.
>>>
>>> Cheers,
>>> Jincheng
>>>
>>> Aljoscha Krettek <[hidden email]> 于2019年9月6日周五 下午5:53写道:
>>>
>>>> Hi,
>>>>
>>>> Regarding stateful functions and MapView/DataView/ListView: I think it’s
>>>> best to keep that for a later FLIP and focus on a more basic version.
>>>> Supporting stateful functions, especially with MapView can potentially be
>>>> very slow so we have to see what we can do there.
>>>>
>>>> For the method names, I don’t know. If FLIP-64 passes they have to be
>>>> changed. So we could use the final names right away, but I’m also fine with
>>>> using the old method names for now.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>> On 5. Sep 2019, at 12:40, jincheng sun <[hidden email]> wrote:
>>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Thanks for your comments!
>>>>>
>>>>> Regarding to the FLIP scope, it seems that we have agreed on the design
>>>> of
>>>>> the stateless function support.
>>>>> What do you think about starting the development of the stateless
>>>> function
>>>>> support firstly and continue the discussion of stateful function support?
>>>>> Or you think we should split the current FLIP into two FLIPs and discuss
>>>>> the stateful function support in another thread?
>>>>>
>>>>> Currently, the Python DataView/MapView/ListView interfaces design follow
>>>>> the Java/Scala naming conversions.
>>>>> Of couse, We can continue to discuss whether there are better solutions,
>>>>> i.e. using annotations.
>>>>>
>>>>> Regarding to the magic logic to support DataView/MapView/ListView, it
>>>> will
>>>>> be done by the framework and is transparent for users.
>>>>> Per my understanding, the magic logic is unavoidable no matter what the
>>>>> interfaces will be.
>>>>>
>>>>> Regarding to the catalog support of python function:1) If it's stored in
>>>>> memory as temporary object, just as you said, users can call
>>>>> TableEnvironment.register_function(will change to
>>>>> register_temporary_function in FLIP-64)
>>>>> 2) If it's persisted in external storage, users can call
>>>>> Catalog.create_function. There will be no API change per my
>>>> understanding.
>>>>>
>>>>> What do you think?
>>>>> Best,Jincheng
>>>>>
>>>>> Aljoscha Krettek <[hidden email]> 于2019年9月5日周四 下午5:32写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Another thing to consider is the Scope of the FLIP. Currently, we try to
>>>>>> support (stateful) AggregateFunctions. I have some concerns about
>>>> whether
>>>>>> or not DataView/MapView/ListView is a good interface because it requires
>>>>>> quite some magic from the runners to make it work, such as messing with
>>>> the
>>>>>> TypeInformation and injecting objects at runtime. If the FLIP aims for
>>>> the
>>>>>> minimum of ScalarFunctions and the whole execution harness, that should
>>>> be
>>>>>> easier to agree on.
>>>>>>
>>>>>> Another point is the naming of the new methods. I think Timo hinted at
>>>> the
>>>>>> fact that we have to consider catalog support for functions. There is
>>>>>> ongoing work about differentiating between temporary objects and objects
>>>>>> that are stored in a catalog (FLIP-64 [1]). With this in mind, the
>>>> method
>>>>>> for registering functions should be called register_temporary_function()
>>>>>> and so on. Unless we want to already think about mixing Python and Java
>>>>>> functions in the catalog, which is outside the scope of this FLIP, I
>>>> think.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> [1]
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>>>>>>
>>>>>>
>>>>>>> On 5. Sep 2019, at 05:01, jincheng sun <[hidden email]>
>>>> wrote:
>>>>>>>
>>>>>>> Hi Aljoscha,
>>>>>>>
>>>>>>> That's a good points, so far, most of the code will live in
>>>> flink-python
>>>>>>> module, and the rules and relNodes will be put into the both blink and
>>>>>>> flink planner modules, some of the common interface of required by
>>>>>> planners
>>>>>>> will be placed in flink-table-common. I think you are right, we should
>>>>>> try
>>>>>>> to ensure the changes of this feature is minimal.  For more detail we
>>>>>> would
>>>>>>> follow this principle when review the PRs.
>>>>>>>
>>>>>>> Great thanks for your questions and remind!
>>>>>>>
>>>>>>> Best,
>>>>>>> Jincheng
>>>>>>>
>>>>>>>
>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年9月4日周三 下午8:58写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Things looks interesting so far!
>>>>>>>>
>>>>>>>> I had one question: Where will most of the support code for this live?
>>>>>>>> Will this add the required code to flink-table-common or the different
>>>>>>>> runners? Can we implement this in such a way that only a minimal
>>>> amount
>>>>>> of
>>>>>>>> support code is required in the parts of the Table API (and Table API
>>>>>>>> runners) that  are not python specific?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>> On 4. Sep 2019, at 14:14, Timo Walther <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Jincheng,
>>>>>>>>>
>>>>>>>>> 2. Serializability of functions: "#2 is very convenient for users"
>>>>>> means
>>>>>>>> only until they have the first backwards-compatibility issue, after
>>>> that
>>>>>>>> they will find it not so convinient anymore and will ask why the
>>>>>> framework
>>>>>>>> allowed storing such objects in a persistent storage. I don't want to
>>>> be
>>>>>>>> picky about it, but wanted to raise awareness that sometimes it is ok
>>>> to
>>>>>>>> limit use cases to guide users for devloping backwards-compatible
>>>>>> programs.
>>>>>>>>>
>>>>>>>>> Thanks for the explanation fo the remaining items. It sounds
>>>> reasonable
>>>>>>>> to me. Regarding the example with `getKind()`, I actually meant
>>>>>>>> `org.apache.flink.table.functions.ScalarFunction#getKind` we don't
>>>> allow
>>>>>>>> users to override this property. And I think we should do something
>>>>>> similar
>>>>>>>> for the getLanguage property.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>> On 03.09.19 15:01, jincheng sun wrote:
>>>>>>>>>> Hi Timo,
>>>>>>>>>>
>>>>>>>>>> Thanks for the quick reply ! :)
>>>>>>>>>> I have added more example for #3 and #5 to the FLIP. That are great
>>>>>>>>>> suggestions !
>>>>>>>>>>
>>>>>>>>>> Regarding 2:
>>>>>>>>>>
>>>>>>>>>> There are two kind Serialization for CloudPickle(Which is different
>>>>>> from
>>>>>>>>>> Java):
>>>>>>>>>> 1) For class and function which can be imported, CloudPickle only
>>>>>>>>>> serialize the full path of the class and function (just like java
>>>>>> class
>>>>>>>>>> name).
>>>>>>>>>> 2) For the class and function which can not be imported, CloudPickle
>>>>>>>> will
>>>>>>>>>> serialize the full content of the class and function.
>>>>>>>>>> For #2, It means that we can not just store the full path of the
>>>> class
>>>>>>>> and
>>>>>>>>>> function.
>>>>>>>>>>
>>>>>>>>>> The above serialization is recursive.
>>>>>>>>>>
>>>>>>>>>> However, there is indeed an problem of backwards compatibility when
>>>>>> the
>>>>>>>>>> module path of the parent class changed. But I think this is an rare
>>>>>>>> case
>>>>>>>>>> and acceptable. i.e., For Flink framework we never change the user
>>>>>>>>>> interface module path if we want to keep backwards compatibility.
>>>> For
>>>>>>>> user
>>>>>>>>>> code, if they change the interface of UDF's parent, they should
>>>>>>>> re-register
>>>>>>>>>> their functions.
>>>>>>>>>>
>>>>>>>>>> If we do not want support #2, we can store the full path of class
>>>> and
>>>>>>>>>> function, in that case we have no backwards compatibility problem.
>>>>>> But I
>>>>>>>>>> think the #2 is very convenient for users.
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Regarding 4:
>>>>>>>>>> As I mentioned earlier, there may be built-in Python functions and I
>>>>>>>> think
>>>>>>>>>> language is a "function" concept. Function and Language are
>>>> orthogonal
>>>>>>>>>> concepts.
>>>>>>>>>> We may have R, GO and other language functions in the future, not
>>>> only
>>>>>>>>>> user-defined, but also built-in functions.
>>>>>>>>>>
>>>>>>>>>> You are right that users will not set this method and for Python
>>>>>>>> functions,
>>>>>>>>>> it will be set in the code-generated Java function by the framework.
>>>>>>>> So, I
>>>>>>>>>> think we should declare the getLanguage() in FunctionDefinition for
>>>>>> now.
>>>>>>>>>> (I'm not pretty sure what do you mean by saying that getKind() is
>>>>>> final
>>>>>>>> in
>>>>>>>>>> UserDefinedFunction?)
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jincheng
>>>>>>>>>>
>>>>>>>>>> Timo Walther <[hidden email]> 于2019年9月3日周二 下午6:01写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>
>>>>>>>>>>> thanks for your response.
>>>>>>>>>>>
>>>>>>>>>>> 2. Serializability of functions: Using some arbitrary serialization
>>>>>>>>>>> format for shipping a function to worker sounds fine to me. But
>>>> once
>>>>>> we
>>>>>>>>>>> store functions a the catalog we need to think about backwards
>>>>>>>>>>> compatibility and evolution of interfaces etc. I'm not sure if
>>>>>>>>>>> CloudPickle is the right long-term storage format for this. If we
>>>>>> don't
>>>>>>>>>>> think about this in advance, we are basically violating our code
>>>>>>>> quality
>>>>>>>>>>> guide [1] of never use Java Serialization but in the Python-way. We
>>>>>> are
>>>>>>>>>>> using the RPC serialization for persistence.
>>>>>>>>>>>
>>>>>>>>>>> 3. TableEnvironment: Can you add some example to the FLIP? Because
>>>>>> API
>>>>>>>>>>> code like the following is not covered there:
>>>>>>>>>>>
>>>>>>>>>>> self.t_env.register_function("add_one", udf(lambda i: i + 1,
>>>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>>>>                                         DataTypes.BIGINT()))
>>>>>>>>>>> self.t_env.register_function("subtract_one", udf(SubtractOne(),
>>>>>>>>>>> DataTypes.BIGINT(),
>>>>>>>>>>> DataTypes.BIGINT()))
>>>>>>>>>>> self.t_env.register_function("add", add)
>>>>>>>>>>>
>>>>>>>>>>> 4. FunctionDefinition: Your response still doesn't answer my
>>>> question
>>>>>>>>>>> entirely. Why do we need FunctionDefinition.getLanguage() if this
>>>> is
>>>>>> a
>>>>>>>>>>> "user-defined function" concept and not a "function" concept. In
>>>> any
>>>>>>>>>>> case, all users should not be able to set this method. So it must
>>>> be
>>>>>>>>>>> final in UserDefinedFunction similar to getKind().
>>>>>>>>>>>
>>>>>>>>>>> 5. Function characteristics: If UserDefinedFunction is defined in
>>>>>>>>>>> Python, why is it not used in your example in FLIP-58. You could
>>>> you
>>>>>>>>>>> extend the example to show how to specify these attributes in the
>>>>>> FLIP?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Timo
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>
>>>> https://flink.apache.org/contributing/code-style-and-quality-java.html
>>>>>>>>>>>
>>>>>>>>>>> On 02.09.19 15:35, jincheng sun wrote:
>>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>>
>>>>>>>>>>>> Great thanks for your feedback. I would like to share my thoughts
>>>>>> with
>>>>>>>>>>> you
>>>>>>>>>>>> inline. :)
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>
>>>>>>>>>>>> Timo Walther <[hidden email]> 于2019年9月2日周一 下午5:04写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> the FLIP looks awesome. However, I would like to discuss the
>>>>>> changes
>>>>>>>> to
>>>>>>>>>>>>> the user-facing parts again. Some feedback:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. DataViews: With the current non-annotation design for
>>>> DataViews,
>>>>>>>> we
>>>>>>>>>>>>> cannot perform eager state declaration, right? At which point
>>>>>> during
>>>>>>>>>>>>> execution do we know which state is required by the function? We
>>>>>>>> need to
>>>>>>>>>>>>> instantiate the function first, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> We will analysis the Python AggregateFunction and extract the
>>>>>>>> DataViews
>>>>>>>>>>>> used in the Python AggregateFunction. This can be done
>>>>>>>>>>>> by instantiate a Python AggregateFunction, creating an accumulator
>>>>>> by
>>>>>>>>>>>> calling method create_accumulator and then analysis the created
>>>>>>>>>>>> accumulator. This is actually similar to the way that Java
>>>>>>>>>>>> AggregateFunction processing codegen logic. The extracted
>>>> DataViews
>>>>>>>> can
>>>>>>>>>>>> then be used to construct the StateDescriptors in the operator,
>>>>>> i.e.,
>>>>>>>> we
>>>>>>>>>>>> should have hold the state spec and the state descriptor id in
>>>> Java
>>>>>>>>>>>> operator and Python worker can access the state by specifying the
>>>>>>>>>>>> corresponding state descriptor id.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 2. Serializability of functions: How do we ensure serializability
>>>>>> of
>>>>>>>>>>>>> functions for catalog persistence? In the Scala/Java API, we
>>>> would
>>>>>>>> like
>>>>>>>>>>>>> to register classes instead of instances soon. This is the only
>>>> way
>>>>>>>> to
>>>>>>>>>>>>> store a function properly in a catalog or we need some
>>>>>>>>>>>>> serialization/deserialization logic in the function interfaces to
>>>>>>>>>>>>> convert an instance to string properties.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The Python function will be serialized with CloudPickle anyway
>>>> in
>>>>>>>> the
>>>>>>>>>>>> Python API as we need to transfer it to the Python worker which
>>>> can
>>>>>>>> then
>>>>>>>>>>>> deserialize it for execution. The serialized Python function can
>>>> be
>>>>>>>>>>> stored
>>>>>>>>>>>> into catalog.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 3. TableEnvironment: What is the signature of
>>>>>>>> `register_function(self,
>>>>>>>>>>>>> name, function)`? Does it accept both a class and function? Like
>>>>>>>> `class
>>>>>>>>>>>>> Sum` and `def split()`? Could you add some examples for
>>>> registering
>>>>>>>> both
>>>>>>>>>>>>> kinds of functions?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> It has been already supported which you mentioned. You can find
>>>> an
>>>>>>>>>>>> example in the POC code:
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 4. FunctionDefinition: Function definition is not a user-defined
>>>>>>>>>>>>> function definition. It is the highest interface for both
>>>>>>>> user-defined
>>>>>>>>>>>>> and built-in functions. I'm not sure if getLanguage() should be
>>>>>> part
>>>>>>>> of
>>>>>>>>>>>>> this interface or one-level down which would be
>>>>>>>> `UserDefinedFunction`.
>>>>>>>>>>>>> Built-in functions will never be implemented in a different
>>>>>>>> language. In
>>>>>>>>>>>>> any case, I would vote for removing the UNKNOWN language, because
>>>>>> it
>>>>>>>>>>>>> does not solve anything. Why should a user declare a function
>>>> that
>>>>>>>> the
>>>>>>>>>>>>> runtime can not handle? I also find the term `JAVA` confusing for
>>>>>>>> Scala
>>>>>>>>>>>>> users. How about `FunctionLanguage.JVM` instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Actually we may have built-in Python functions in the future.
>>>>>>>> Regarding
>>>>>>>>>>>> to the following expression: py_udf1(a, b) + py_udf2(c), if there
>>>> is
>>>>>>>>>>>> built-in Python
>>>>>>>>>>>> funciton for '+' operator, then we don't need to mix using Java
>>>> and
>>>>>>>>>>> Python
>>>>>>>>>>>> UDFs. In this way, we can improve the execution performance.
>>>>>>>>>>>> Regarding to removing FunctionLanguage.UNKNOWN and renaming
>>>>>>>>>>>> FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense
>>>>>> to
>>>>>>>> me.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> 5. Function characteristics: In the current design, function
>>>>>> classes
>>>>>>>> do
>>>>>>>>>>>>> not extend from any upper class. How can users declare
>>>>>>>> characteristics
>>>>>>>>>>>>> that are present in `FunctionDefinition` like determinism,
>>>>>>>> requirements,
>>>>>>>>>>>>> or soon also monotonism.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Actually we have defined 'UserDefinedFunction' which is the base
>>>>>>>> class
>>>>>>>>>>>> for all user-defined functions.
>>>>>>>>>>>> We can define the deterministic, requirements, etc in this class.
>>>>>>>>>>>> Currently, we have already supported to define the deterministic.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 02.09.19 03:38, Shaoxuan Wang wrote:
>>>>>>>>>>>>>> Hi Jincheng, Fudian, and Aljoscha,
>>>>>>>>>>>>>> I am assuming the proposed python UDX can also be applied to
>>>> Flink
>>>>>>>> SQL.
>>>>>>>>>>>>>> Is this correct? If yes, I would suggest to title the FLIP as
>>>>>> "Flink
>>>>>>>>>>>>> Python
>>>>>>>>>>>>>> User-Defined Function" or "Flink Python User-Defined Function
>>>> for
>>>>>>>>>>> Table".
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Aug 28, 2019 at 12:22 PM jincheng sun <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the feedback Bowen!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Great thanks for create the FLIP and bring up the VOTE Dian!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月28日周三 上午11:32写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have started a voting thread [1]. Thanks a lot for your help
>>>>>>>> during
>>>>>>>>>>>>>>>> creating the FLIP @Jincheng.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Bowen,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Very appreciated for your comments. I have replied you in the
>>>>>>>> design
>>>>>>>>>>>>> doc.
>>>>>>>>>>>>>>>> As it seems that the comments doesn't affect the overall
>>>> design,
>>>>>>>> I'll
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> cancel the vote for now and we can continue the discussion in
>>>>>> the
>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>> doc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 在 2019年8月28日,上午11:05,Bowen Li <[hidden email]> 写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Jincheng and Dian,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Sorry for being late to the party. I took a glance at the
>>>>>>>> proposal,
>>>>>>>>>>>>>>> LGTM
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> general, and I left only a couple comments.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Bowen
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Aug 26, 2019 at 8:05 PM Dian Fu <
>>>> [hidden email]
>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks! It works.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 在 2019年8月27日,上午10:55,jincheng sun <
>>>> [hidden email]>
>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Dian, can you check if you have edit access? :)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月26日周一 上午10:52写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Appreciated for the kind tips and offering of help.
>>>>>> Definitely
>>>>>>>>>>> need
>>>>>>>>>>>>>>>> it!
>>>>>>>>>>>>>>>>>>>> Could you grant me write permission for confluence? My Id:
>>>>>>>> Dian
>>>>>>>>>>> Fu
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 在 2019年8月26日,上午9:53,jincheng sun <
>>>> [hidden email]
>>>>>>>
>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for your feedback Hequn & Dian.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Dian, I am glad to see that you want help to create the
>>>>>> FLIP!
>>>>>>>>>>>>>>>>>>>>> Everyone will have first time, and I am very willing to
>>>>>> help
>>>>>>>> you
>>>>>>>>>>>>>>>>>> complete
>>>>>>>>>>>>>>>>>>>>> your first FLIP creation. Here some tips:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> - First I'll give your account write permission for
>>>>>>>> confluence.
>>>>>>>>>>>>>>>>>>>>> - Before create the FLIP, please have look at the FLIP
>>>>>>>> Template
>>>>>>>>>>>>>>> [1],
>>>>>>>>>>>>>>>>>>>> (It's
>>>>>>>>>>>>>>>>>>>>> better to know more about FLIP by reading [2])
>>>>>>>>>>>>>>>>>>>>> - Create Flink Python UDFs related JIRAs after completing
>>>>>> the
>>>>>>>>>>> VOTE
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> FLIP.(I think you also can bring up the VOTE thread, if
>>>> you
>>>>>>>>>>> want!
>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>> Any problems you encounter during this period,feel free
>>>> to
>>>>>>>> tell
>>>>>>>>>>> me
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> can solve them together. :)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template
>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>>>>>>>>>>>>>>>>>>>> Hequn Cheng <[hidden email]> 于2019年8月23日周五
>>>>>> 上午11:54写道:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> +1 for starting the vote.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks Jincheng a lot for the discussion.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best, Hequn
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 23, 2019 at 10:06 AM Dian Fu <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> +1 to start the FLIP create and VOTE on this feature.
>>>> I'm
>>>>>>>>>>>>> willing
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> help
>>>>>>>>>>>>>>>>>>>>>>> on the FLIP create if you don't mind. As I haven't
>>>>>> created
>>>>>>>> a
>>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>>>>>>>>>> it will be great if you could help on this. :)
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月22日,下午11:41,jincheng sun <
>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for your feedback. If there are no more
>>>>>>>>>>>>> suggestions
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> comments, I think it's better to  initiate a vote to
>>>>>>>> create a
>>>>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> Apache Flink Python UDFs.
>>>>>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best, Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> jincheng sun <[hidden email]> 于2019年8月15日周四
>>>>>>>>>>>>>>> 上午12:54写道:
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your confirmation and the very important
>>>>>>>> reminder
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>>>> processing.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I have had add the description about how to perform
>>>>>>>> bundle
>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>> the perspective of checkpoint and watermark. Feel
>>>> free
>>>>>> to
>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>>>>>> comments if
>>>>>>>>>>>>>>>>>>>>>>>>> there are anything not describe clearly.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Dian Fu <[hidden email]> 于2019年8月14日周三
>>>>>> 上午10:08写道:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot the suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding to bundle processing, there is a section
>>>>>>>>>>>>>>>> "Checkpoint"[1]
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> design doc which talks about how to handle the
>>>>>>>> checkpoint.
>>>>>>>>>>>>>>>>>>>>>>>>>> However, I think you are right that we should talk
>>>>>> more
>>>>>>>>>>> about
>>>>>>>>>>>>>>>> it,
>>>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>>> what's bundle processing, how it affects the
>>>>>> checkpoint
>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> watermark,
>>>>>>>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>>>>>> to handle the checkpoint and watermark, etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit#heading=h.urladt565yo3
>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2019年8月14日,上午1:01,Thomas Weise <[hidden email]>
>>>>>> 写道:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for putting this together. The proposal is
>>>>>> very
>>>>>>>>>>>>>>>> detailed,
>>>>>>>>>>>>>>>>>>>>>>>>>> thorough
>>>>>>>>>>>>>>>>>>>>>>>>>>> and for me as a Beam Flink runner contributor easy
>>>> to
>>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>> One thing that you should probably detail more is
>>>> the
>>>>>>>>>>> bundle
>>>>>>>>>>>>>>>>>>>>>>>>>> processing. It
>>>>>>>>>>>>>>>>>>>>>>>>>>> is critically important for performance that
>>>> multiple
>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>> processed in a bundle. The default bundle size in
>>>> the
>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>> runner
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>> 1s or
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1000 elements, whichever comes first. And for
>>>>>>>> streaming,
>>>>>>>>>>> you
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> logic necessary to align the bundle processing with
>>>>>>>>>>>>>>> watermarks
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpointing here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://github.com/apache/beam/blob/release-2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 13, 2019 at 7:05 AM jincheng sun <
>>>>>>>>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The Python Table API(without Python UDF support)
>>>> has
>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and will be available in the coming release 1.9.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> As Python UDF is very important for Python users,
>>>>>> we'd
>>>>>>>>>>> like
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion about the Python UDF support in the
>>>>>> Python
>>>>>>>>>>> Table
>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Aljoscha Krettek, Dian Fu and I have discussed
>>>>>> offline
>>>>>>>>>>> and
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>> drafted a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> design doc[1]. It includes the following items:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The user-defined function execution
>>>> architecture.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> As mentioned by many guys in the previous
>>>> discussion
>>>>>>>>>>>>>>>> thread[2],
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework was introduced in Apache
>>>> Beam
>>>>>> in
>>>>>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>>>>>>>>>> releases. It
>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides well-defined, language-neutral data
>>>>>>>> structures
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> protocols
>>>>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> language-neutral user-defined function execution.
>>>>>> This
>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Beam's portability framework. We will introduce
>>>> how
>>>>>> to
>>>>>>>>>>> make
>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>> Beam's
>>>>>>>>>>>>>>>>>>>>>>>>>>>> portability framework for user-defined function
>>>>>>>>>>> execution:
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>>>>>>>>>>> transmission, state access, checkpoint, metrics,
>>>>>>>> logging,
>>>>>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Considering that the design relies on Beam's
>>>>>>>> portability
>>>>>>>>>>>>>>>>>> framework
>>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Python user-defined function execution and not all
>>>>>> the
>>>>>>>>>>>>>>>>>>>> contributors
>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink community are familiar with Beam's
>>>> portability
>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>> done a prototype[3] for proof of concept and also
>>>>>>>> ease of
>>>>>>>>>>>>>>>>>>>>>>>>>> understanding of
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the design.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Welcome any feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> https://docs.google.com/document/d/1WpTyCXAQh8Jr2yWfz7MWCD2-lou05QaQFb810ZvTefY/edit?usp=sharing
>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html
>>>>>>>>>>>>>>>>>>>>>>>>>>>> [3]
>>>> https://github.com/dianfu/flink/commits/udf_poc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>
>

12