Dian Fu created FLINK-20046:
------------------------------- Summary: StreamTableAggregateTests.test_map_view_iterate is instable Key: FLINK-20046 URL: https://issues.apache.org/jira/browse/FLINK-20046 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.12.0 Reporter: Dian Fu Fix For: 1.12.0 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9279&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=4fad9527-b9a5-5015-1b70-8356e5c91490 {code} 2020-11-07T22:50:57.4180758Z _______________ StreamTableAggregateTests.test_map_view_iterate ________________ 2020-11-07T22:50:57.4181301Z 2020-11-07T22:50:57.4181965Z self = <pyflink.table.tests.test_aggregate.StreamTableAggregateTests testMethod=test_map_view_iterate> 2020-11-07T22:50:57.4182348Z 2020-11-07T22:50:57.4182535Z def test_map_view_iterate(self): 2020-11-07T22:50:57.4182812Z test_iterate = udaf(TestIterateAggregateFunction()) 2020-11-07T22:50:57.4183320Z self.t_env.get_config().set_idle_state_retention(datetime.timedelta(days=1)) 2020-11-07T22:50:57.4183763Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4297555Z "python.fn-execution.bundle.size", "2") 2020-11-07T22:50:57.4297922Z # trigger the cache eviction in a bundle. 2020-11-07T22:50:57.4308028Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4308653Z "python.state.cache-size", "2") 2020-11-07T22:50:57.4308945Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4309382Z "python.map-state.read-cache-size", "2") 2020-11-07T22:50:57.4309676Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4310428Z "python.map-state.write-cache-size", "2") 2020-11-07T22:50:57.4310701Z self.t_env.get_config().get_configuration().set_string( 2020-11-07T22:50:57.4311130Z "python.map-state.iterate-response-batch-size", "2") 2020-11-07T22:50:57.4311361Z t = self.t_env.from_elements( 2020-11-07T22:50:57.4311691Z [(1, 'Hi_', 'hi'), 2020-11-07T22:50:57.4312004Z (1, 'Hi', 'hi'), 2020-11-07T22:50:57.4312316Z (2, 'hello', 'hello'), 2020-11-07T22:50:57.4312639Z (3, 'Hi_', 'hi'), 2020-11-07T22:50:57.4312975Z (3, 'Hi', 'hi'), 2020-11-07T22:50:57.4313285Z (4, 'hello', 'hello'), 2020-11-07T22:50:57.4313609Z (5, 'Hi2_', 'hi'), 2020-11-07T22:50:57.4313908Z (5, 'Hi2', 'hi'), 2020-11-07T22:50:57.4314238Z (6, 'hello2', 'hello'), 2020-11-07T22:50:57.4314558Z (7, 'Hi', 'hi'), 2020-11-07T22:50:57.4315053Z (8, 'hello', 'hello'), 2020-11-07T22:50:57.4315396Z (9, 'Hi2', 'hi'), 2020-11-07T22:50:57.4315773Z (13, 'Hi3', 'hi')], ['a', 'b', 'c']) 2020-11-07T22:50:57.4316023Z self.t_env.create_temporary_view("source", t) 2020-11-07T22:50:57.4316299Z table_with_retract_message = self.t_env.sql_query( 2020-11-07T22:50:57.4316615Z "select LAST_VALUE(b) as b, LAST_VALUE(c) as c from source group by a") 2020-11-07T22:50:57.4316919Z result = table_with_retract_message.group_by(t.c) \ 2020-11-07T22:50:57.4317197Z .select(test_iterate(t.b).alias("a"), t.c) \ 2020-11-07T22:50:57.4317619Z .select(col("a").get(0).alias("a"), 2020-11-07T22:50:57.4318111Z col("a").get(1).alias("b"), 2020-11-07T22:50:57.4318357Z col("a").get(2).alias("c"), 2020-11-07T22:50:57.4318586Z col("a").get(3).alias("d"), 2020-11-07T22:50:57.4318814Z t.c.alias("e")) 2020-11-07T22:50:57.4319023Z assert_frame_equal( 2020-11-07T22:50:57.4319208Z > result.to_pandas(), 2020-11-07T22:50:57.4319408Z pd.DataFrame([ 2020-11-07T22:50:57.4319872Z ["hello,hello2", "1,3", 'hello:3,hello2:1', 2, "hello"], 2020-11-07T22:50:57.4320398Z ["Hi,Hi2,Hi3", "1,2,3", "Hi:3,Hi2:2,Hi3:1", 3, "hi"]], 2020-11-07T22:50:57.4321047Z columns=['a', 'b', 'c', 'd', 'e'])) 2020-11-07T22:50:57.4321198Z 2020-11-07T22:50:57.4321385Z pyflink/table/tests/test_aggregate.py:468: 2020-11-07T22:50:57.4321648Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2020-11-07T22:50:57.4322040Z pyflink/table/table.py:807: in to_pandas 2020-11-07T22:50:57.4322299Z .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size) 2020-11-07T22:50:57.4322794Z .tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in __call__ 2020-11-07T22:50:57.4323103Z answer, self.gateway_client, self.target_id, self.name) 2020-11-07T22:50:57.4323351Z pyflink/util/exceptions.py:147: in deco 2020-11-07T22:50:57.4323537Z return f(*a, **kw) 2020-11-07T22:50:57.4323783Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2020-11-07T22:50:57.4323963Z 2020-11-07T22:50:57.4324225Z answer = 'xro8653' 2020-11-07T22:50:57.4324496Z gateway_client = <py4j.java_gateway.GatewayClient object at 0x7fe5c619db70> 2020-11-07T22:50:57.4324943Z target_id = 'z:org.apache.flink.table.runtime.arrow.ArrowUtils' 2020-11-07T22:50:57.4325312Z name = 'collectAsPandasDataFrame' 2020-11-07T22:50:57.4325439Z 2020-11-07T22:50:57.4325839Z def get_return_value(answer, gateway_client, target_id=None, name=None): 2020-11-07T22:50:57.4326420Z """Converts an answer received from the Java gateway into a Python object. 2020-11-07T22:50:57.4326648Z 2020-11-07T22:50:57.4326881Z For example, string representation of integers are converted to Python 2020-11-07T22:50:57.4327193Z integer, string representation of objects are converted to JavaObject 2020-11-07T22:50:57.4327451Z instances, etc. 2020-11-07T22:50:57.4327614Z 2020-11-07T22:50:57.4327819Z :param answer: the string returned by the Java gateway 2020-11-07T22:50:57.4328157Z :param gateway_client: the gateway client used to communicate with the Java 2020-11-07T22:50:57.4329738Z Gateway. Only necessary if the answer is a reference (e.g., object, 2020-11-07T22:50:57.4330018Z list, map) 2020-11-07T22:50:57.4330273Z :param target_id: the name of the object from which the answer comes from 2020-11-07T22:50:57.4330588Z (e.g., *object1* in `object1.hello()`). Optional. 2020-11-07T22:50:57.4330873Z :param name: the name of the member from which the answer comes from 2020-11-07T22:50:57.4331170Z (e.g., *hello* in `object1.hello()`). Optional. 2020-11-07T22:50:57.4331375Z """ 2020-11-07T22:50:57.4331542Z if is_error(answer)[0]: 2020-11-07T22:50:57.4331761Z if len(answer) > 1: 2020-11-07T22:50:57.4331954Z type = answer[1] 2020-11-07T22:50:57.4332222Z value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 2020-11-07T22:50:57.4332531Z if answer[1] == REFERENCE_TYPE: 2020-11-07T22:50:57.4332757Z raise Py4JJavaError( 2020-11-07T22:50:57.4333016Z "An error occurred while calling {0}{1}{2}.\n". 2020-11-07T22:50:57.4333303Z > format(target_id, ".", name), value) 2020-11-07T22:50:57.4333700Z E py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. 2020-11-07T22:50:57.4334558Z E : java.lang.RuntimeException: Could not remove element ',,,1,hi', should never happen. 2020-11-07T22:50:57.4335019Z E at org.apache.flink.table.runtime.arrow.ArrowUtils.filterOutRetractRows(ArrowUtils.java:708) 2020-11-07T22:50:57.4335479Z E at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:635) 2020-11-07T22:50:57.4336238Z E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2020-11-07T22:50:57.4336645Z E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2020-11-07T22:50:57.4337099Z E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2020-11-07T22:50:57.4337485Z E at java.lang.reflect.Method.invoke(Method.java:498) 2020-11-07T22:50:57.4337911Z E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 2020-11-07T22:50:57.4338410Z E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 2020-11-07T22:50:57.4338859Z E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) 2020-11-07T22:50:57.4339324Z E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 2020-11-07T22:50:57.4339810Z E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) 2020-11-07T22:50:57.4340260Z E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) 2020-11-07T22:50:57.4340651Z E at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |