[jira] [Created] (FLINK-19725) DataStreamTests.test_key_by_on_connect_stream is failed on Azure

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19725) DataStreamTests.test_key_by_on_connect_stream is failed on Azure

Shang Yuanchun (Jira)
Jark Wu created FLINK-19725:
-------------------------------

             Summary: DataStreamTests.test_key_by_on_connect_stream is failed on Azure
                 Key: FLINK-19725
                 URL: https://issues.apache.org/jira/browse/FLINK-19725
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Jark Wu
             Fix For: 1.12.0


Here is an instance:

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=7877&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=8d78fe4f-d658-5c70-12f8-4921589024c3&l=20270

{code}
2020-10-19T18:06:30.6704453Z =================================== FAILURES ===================================
2020-10-19T18:06:30.6705106Z ________________ DataStreamTests.test_key_by_on_connect_stream _________________
2020-10-19T18:06:30.6705465Z
2020-10-19T18:06:30.6705934Z self = <pyflink.datastream.tests.test_data_stream.DataStreamTests testMethod=test_key_by_on_connect_stream>
2020-10-19T18:06:30.6707720Z
2020-10-19T18:06:30.6708558Z     def test_key_by_on_connect_stream(self):
2020-10-19T18:06:30.6710053Z         ds1 = self.env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), ('e', 2)],
2020-10-19T18:06:30.6710691Z                                        type_info=Types.ROW([Types.STRING(), Types.INT()])) \
2020-10-19T18:06:30.6711381Z             .key_by(MyKeySelector(), key_type_info=Types.INT())
2020-10-19T18:06:30.6717965Z         ds2 = self.env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), ('e', 2)],
2020-10-19T18:06:30.6718712Z                                        type_info=Types.ROW([Types.STRING(), Types.INT()]))
2020-10-19T18:06:30.6719278Z    
2020-10-19T18:06:30.6719679Z         class AssertKeyCoMapFunction(CoMapFunction):
2020-10-19T18:06:30.6720136Z             def __init__(self):
2020-10-19T18:06:30.6720528Z                 self.pre1 = None
2020-10-19T18:06:30.6720881Z                 self.pre2 = None
2020-10-19T18:06:30.6721195Z    
2020-10-19T18:06:30.6721510Z             def map1(self, value):
2020-10-19T18:06:30.6722215Z                 if value[0] == 'b':
2020-10-19T18:06:30.6722849Z                     assert self.pre1 == 'a'
2020-10-19T18:06:30.6723621Z                 if value[0] == 'd':
2020-10-19T18:06:30.6724247Z                     assert self.pre1 == 'c'
2020-10-19T18:06:30.6724652Z                 self.pre1 = value[0]
2020-10-19T18:06:30.6725002Z                 return value
2020-10-19T18:06:30.6730397Z    
2020-10-19T18:06:30.6730856Z             def map2(self, value):
2020-10-19T18:06:30.6731728Z                 if value[0] == 'b':
2020-10-19T18:06:30.6732399Z                     assert self.pre2 == 'a'
2020-10-19T18:06:30.6733025Z                 if value[0] == 'd':
2020-10-19T18:06:30.6733769Z                     assert self.pre2 == 'c'
2020-10-19T18:06:30.6734159Z                 self.pre2 = value[0]
2020-10-19T18:06:30.6734521Z                 return value
2020-10-19T18:06:30.6735084Z    
2020-10-19T18:06:30.6735376Z         ds1.connect(ds2)\
2020-10-19T18:06:30.6735862Z             .key_by(MyKeySelector(), MyKeySelector(), key_type_info=Types.INT())\
2020-10-19T18:06:30.6736369Z             .map(AssertKeyCoMapFunction())\
2020-10-19T18:06:30.6736761Z             .add_sink(self.test_sink)
2020-10-19T18:06:30.6737079Z    
2020-10-19T18:06:30.6737718Z >       self.env.execute('key_by_test')
2020-10-19T18:06:30.6737996Z
2020-10-19T18:06:30.6738342Z pyflink/datastream/tests/test_data_stream.py:206:
2020-10-19T18:06:30.6738880Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-10-19T18:06:30.6739598Z pyflink/datastream/stream_execution_environment.py:621: in execute
2020-10-19T18:06:30.6740205Z     return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
2020-10-19T18:06:30.6741118Z .tox/py35-cython/lib/python3.5/site-packages/py4j/java_gateway.py:1286: in __call__
2020-10-19T18:06:30.6741690Z     answer, self.gateway_client, self.target_id, self.name)
2020-10-19T18:06:30.6745640Z pyflink/util/exceptions.py:147: in deco
2020-10-19T18:06:30.6746059Z     return f(*a, **kw)
2020-10-19T18:06:30.6746516Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-10-19T18:06:30.6746870Z
2020-10-19T18:06:30.6747564Z answer = 'xro4008'
2020-10-19T18:06:30.6748011Z gateway_client = <py4j.java_gateway.GatewayClient object at 0x7f3087c37748>
2020-10-19T18:06:30.6748717Z target_id = 'o3660', name = 'execute'
2020-10-19T18:06:30.6749062Z
2020-10-19T18:06:30.6749495Z     def get_return_value(answer, gateway_client, target_id=None, name=None):
2020-10-19T18:06:30.6750188Z         """Converts an answer received from the Java gateway into a Python object.
2020-10-19T18:06:30.6750598Z    
2020-10-19T18:06:30.6751006Z         For example, string representation of integers are converted to Python
2020-10-19T18:06:30.6751559Z         integer, string representation of objects are converted to JavaObject
2020-10-19T18:06:30.6752020Z         instances, etc.
2020-10-19T18:06:30.6752313Z    
2020-10-19T18:06:30.6752693Z         :param answer: the string returned by the Java gateway
2020-10-19T18:06:30.6753379Z         :param gateway_client: the gateway client used to communicate with the Java
2020-10-19T18:06:30.6753980Z             Gateway. Only necessary if the answer is a reference (e.g., object,
2020-10-19T18:06:30.6754411Z             list, map)
2020-10-19T18:06:30.6754857Z         :param target_id: the name of the object from which the answer comes from
2020-10-19T18:06:30.6755430Z             (e.g., *object1* in `object1.hello()`). Optional.
2020-10-19T18:06:30.6755969Z         :param name: the name of the member from which the answer comes from
2020-10-19T18:06:30.6756522Z             (e.g., *hello* in `object1.hello()`). Optional.
2020-10-19T18:06:30.6756888Z         """
2020-10-19T18:06:30.6757327Z         if is_error(answer)[0]:
2020-10-19T18:06:30.6757721Z             if len(answer) > 1:
2020-10-19T18:06:30.6758122Z                 type = answer[1]
2020-10-19T18:06:30.6758610Z                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
2020-10-19T18:06:30.6759288Z                 if answer[1] == REFERENCE_TYPE:
2020-10-19T18:06:30.6759725Z                     raise Py4JJavaError(
2020-10-19T18:06:30.6760202Z                         "An error occurred while calling {0}{1}{2}.\n".
2020-10-19T18:06:30.6760710Z >                       format(target_id, ".", name), value)
2020-10-19T18:06:30.6761374Z E                   py4j.protocol.Py4JJavaError: An error occurred while calling o3660.execute.
2020-10-19T18:06:30.6762068Z E                   : java.lang.Exception: Could not create actor system
2020-10-19T18:06:30.6762857Z E                   at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:263)
2020-10-19T18:06:30.6763949Z E                   at org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:341)
2020-10-19T18:06:30.6765129Z E                   at org.apache.flink.runtime.metrics.util.MetricUtils.startMetricRpcService(MetricUtils.java:152)
2020-10-19T18:06:30.6766058Z E                   at org.apache.flink.runtime.metrics.util.MetricUtils.startLocalMetricsRpcService(MetricUtils.java:142)
2020-10-19T18:06:30.6766915Z E                   at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:275)
2020-10-19T18:06:30.6767883Z E                   at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:74)
2020-10-19T18:06:30.6768798Z E                   at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
2020-10-19T18:06:30.6769938Z E                   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1867)
2020-10-19T18:06:30.6770886Z E                   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1763)
2020-10-19T18:06:30.6771888Z E                   at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
2020-10-19T18:06:30.6772685Z E                   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-10-19T18:06:30.6773464Z E                   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-10-19T18:06:30.6774309Z E                   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-10-19T18:06:30.6775064Z E                   at java.lang.reflect.Method.invoke(Method.java:498)
2020-10-19T18:06:30.6775849Z E                   at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
2020-10-19T18:06:30.6776765Z E                   at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
2020-10-19T18:06:30.6777692Z E                   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
2020-10-19T18:06:30.6778577Z E                   at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
2020-10-19T18:06:30.6779591Z E                   at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
2020-10-19T18:06:30.6780471Z E                   at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
2020-10-19T18:06:30.6781193Z E                   at java.lang.Thread.run(Thread.java:748)
2020-10-19T18:06:30.6783433Z E                   Caused by: akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]]
2020-10-19T18:06:30.6784902Z E                   at akka.event.LoggingBus$class.startDefaultLoggers(Logging.scala:147)
2020-10-19T18:06:30.6785646Z E                   at akka.event.EventStream.startDefaultLoggers(EventStream.scala:22)
2020-10-19T18:06:30.6786417Z E                   at akka.actor.LocalActorRefProvider.init(ActorRefProvider.scala:662)
2020-10-19T18:06:30.6787228Z E                   at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:874)
2020-10-19T18:06:30.6787974Z E                   at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:870)
2020-10-19T18:06:30.6788680Z E                   at akka.actor.ActorSystemImpl._start(ActorSystem.scala:870)
2020-10-19T18:06:30.6789462Z E                   at akka.actor.ActorSystemImpl.start(ActorSystem.scala:891)
2020-10-19T18:06:30.6790182Z E                   at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
2020-10-19T18:06:30.6790915Z E                   at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
2020-10-19T18:06:30.6791819Z E                   at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
2020-10-19T18:06:30.6792597Z E                   at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
2020-10-19T18:06:30.6793524Z E                   at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
2020-10-19T18:06:30.6794301Z E                   at org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
2020-10-19T18:06:30.6795256Z E                   at org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
2020-10-19T18:06:30.6796016Z E                   ... 20 more
2020-10-19T18:06:30.6796252Z
2020-10-19T18:06:30.6797013Z .tox/py35-cython/lib/python3.5/site-packages/py4j/protocol.py:328: Py4JJavaError
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)