YufeiLiu created FLINK-21208:
-------------------------------- Summary: pyarrow exception when using window with pandas udaf Key: FLINK-21208 URL: https://issues.apache.org/jira/browse/FLINK-21208 Project: Flink Issue Type: Improvement Components: API / Python Affects Versions: 1.12.0 Reporter: YufeiLiu I write a pyflink demo and execute in local environment, the logic is simple:generate records and aggerate in 100s tumle window, using a pandas udaf. But the job failed after several minutes, I don't think it's a resource problem because the amount of data is small, here is the error trace. {code:java} Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1108) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1082) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1213) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:302) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) Caused by: TimerException{java.lang.RuntimeException: Failed to close remote bundle} ... 11 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211) ... 10 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 3: Traceback (most recent call last): File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle element.data) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 292, in apache_beam.runners.worker.operations.Operation.process File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 73, in process for value in o.value: File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64()) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)]) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream reader = pa.ipc.open_stream(stream) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 146, in open_stream return RecordBatchStreamReader(source) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 62, in __init__ self._open(source) File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status OSError: Expected IPC message of type schema but got record batch at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369) ... 15 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 3: Traceback (most recent call last): File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle element.data) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 292, in apache_beam.runners.worker.operations.Operation.process File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 73, in process for value in o.value: File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64()) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)]) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream reader = pa.ipc.open_stream(stream) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 146, in open_stream return RecordBatchStreamReader(source) File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 62, in __init__ self._open(source) File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status OSError: Expected IPC message of type schema but got record batch at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more {code} And my test code: {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import * from pyflink.table.udf import udaf, AggregateFunction from pyflink.table.window import Tumble class MyTestAggregateFunction(AggregateFunction): def get_value(self, accumulator): return accumulator[0] def create_accumulator(self): return Row(0) def accumulate(self, accumulator, *args): accumulator[0] = len(args[0]) def get_result_type(self): return DataTypes.BIGINT() if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() f_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, None, f_s_settings) my_udaf = udaf(MyTestAggregateFunction(), func_type="pandas") t_env.register_function('my_udaf', my_udaf) t_env.sql_update(""" CREATE TABLE `source_table` ( `header` STRING, ts AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100' ) """) t_env.sql_update(""" CREATE TABLE `sink_table` ( `content` BIGINT, `wstart` TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) """) t_env.scan("source_table") \ .window(Tumble.over("100.second").on("ts").alias("w")) \ .group_by('w') \ .select("my_udaf(header), w.start")\ .insert_into("sink_table") t_env.execute("test_job") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |