awayne created FLINK-21434:
------------------------------ Summary: When UDAF return ROW type, and the number of fields is more than 14, the crash happend Key: FLINK-21434 URL: https://issues.apache.org/jira/browse/FLINK-21434 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.1 Environment: python 3.7.5 pyflink 1.12.1 Reporter: awayne Code(a simple udaf to return a Row containing 15 fields): {code:python} from pyflink.common import Row from pyflink.table.udf import AggregateFunction, udaf from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment class Test(AggregateFunction): def create_accumulator(self): return Row(0, 0) def get_value(self, accumulator): return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23) def accumulate(self, accumulator, a, b): pass def get_result_type(self): return DataTypes.ROW([ DataTypes.FIELD("f1", DataTypes.FLOAT()), DataTypes.FIELD("f2", DataTypes.FLOAT()), DataTypes.FIELD("f3", DataTypes.FLOAT()), DataTypes.FIELD("f4", DataTypes.FLOAT()), DataTypes.FIELD("f5", DataTypes.FLOAT()), DataTypes.FIELD("f6", DataTypes.FLOAT()), DataTypes.FIELD("f7", DataTypes.FLOAT()), DataTypes.FIELD("f8", DataTypes.FLOAT()), DataTypes.FIELD("f9", DataTypes.FLOAT()), DataTypes.FIELD("f10", DataTypes.FLOAT()), DataTypes.FIELD("f11", DataTypes.FLOAT()), DataTypes.FIELD("f12", DataTypes.FLOAT()), DataTypes.FIELD("f13", DataTypes.FLOAT()), DataTypes.FIELD("f14", DataTypes.FLOAT()), DataTypes.FIELD("f15", DataTypes.FLOAT()) ]) def get_accumulator_type(self): return DataTypes.ROW([ DataTypes.FIELD("f1", DataTypes.BIGINT()), DataTypes.FIELD("f2", DataTypes.BIGINT())]) def udaf_test(): env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() table_env = StreamTableEnvironment.create(environment_settings=env_settings) test = udaf(Test()) table_env.execute_sql(""" CREATE TABLE print_sink ( `name` STRING, `agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT, f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT, f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT, f13 FLOAT, f14 FLOAT, f15 FLOAT> ) WITH ( 'connector' = 'print' ) """) table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name']) result_table = table.group_by(table.name)\ .select(table.name, test(table.value, table.count)) result_table.execute_insert("print_sink").wait() if __name__ == "__main__": udaf_test() {code} Exception: {code:java} Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397) at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451) at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72) at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |