Huang Xingbo created FLINK-20507:
------------------------------------
Summary: Support Aggregate Operation in Python Table API
Key: FLINK-20507
URL:
https://issues.apache.org/jira/browse/FLINK-20507 Project: Flink
Issue Type: Sub-task
Components: API / Python
Reporter: Huang Xingbo
Fix For: 1.13.0
Support Python UDAF for Aggregate Operation in Python Table API
The usage:
{code:java}
t = ... # type: Table, table schema: [a: String, b: Int, c: Int]
# aggregate General Python UDAF
t_env.create_temporary_function("agg", GeneralPythonAggregateFunction())
t.group_by(t.c).select("agg(a)")
# aggregate Pandas UDAF
mean_max_udaf = udaf(lambda a: Row(a.mean(), a.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT()),
func_type="pandas")
t.group_by(t.a).aggregate(mean_max_udaf(t.b).alias("d", "f")).select("a, d, f"){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)