Hello all,
I have recently been working on adding bulk iterations to the Python API of Flink in order to facilitate a research project I am working on. The current changes can be seen in this GitHub diff: https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 This implementation seems to work for, at least, simple examples, such as incrementing numbers in a data set. However, with the transformations required for my project, I get an exception "java.lang.ClassCastException: [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown from the deserializers called by org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. I've created the following simplified Python plan by stripping down my research project code to the problem-causing parts: https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a I have been working on this issue but I don't have any ideas on what might be the problem. Perhaps someone more knowledgeable about the interior of the Python API could kindly help? Thank you very much. Geoffrey Mon |
Hello,
I'll try to take a look this week. Regards, Chesnay On 20.09.2016 02:38, Geoffrey Mon wrote: > Hello all, > > I have recently been working on adding bulk iterations to the Python API of > Flink in order to facilitate a research project I am working on. The > current changes can be seen in this GitHub diff: > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > > This implementation seems to work for, at least, simple examples, such as > incrementing numbers in a data set. However, with the transformations > required for my project, I get an exception "java.lang.ClassCastException: > [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown from the > deserializers called by > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > I've created the following simplified Python plan by stripping down my > research project code to the problem-causing parts: > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > I have been working on this issue but I don't have any ideas on what might > be the problem. Perhaps someone more knowledgeable about the interior of > the Python API could kindly help? > > Thank you very much. > > Geoffrey Mon > |
Hello Geoffrey,
i could not reproduce this issue with the commits and plan you provided. I tried out both the FLINK-4098 and bulk-iterations branches (and reverted back to the specified commits) and built Flink from scratch. Could you double check that the code you provided produces the error? Also, which OS/python version are you using? Regards, Chesnay On 20.09.2016 11:13, Chesnay Schepler wrote: > Hello, > > I'll try to take a look this week. > > Regards, > Chesnay > > On 20.09.2016 02:38, Geoffrey Mon wrote: >> Hello all, >> >> I have recently been working on adding bulk iterations to the Python >> API of >> Flink in order to facilitate a research project I am working on. The >> current changes can be seen in this GitHub diff: >> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >> >> >> This implementation seems to work for, at least, simple examples, >> such as >> incrementing numbers in a data set. However, with the transformations >> required for my project, I get an exception >> "java.lang.ClassCastException: >> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown >> from the >> deserializers called by >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >> I've created the following simplified Python plan by stripping down my >> research project code to the problem-causing parts: >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >> >> I have been working on this issue but I don't have any ideas on what >> might >> be the problem. Perhaps someone more knowledgeable about the interior of >> the Python API could kindly help? >> >> Thank you very much. >> >> Geoffrey Mon >> > > |
Hello Chesnay,
Thank you for your help. After receiving your message I recompiled my version of Flink completely, and both the NullPointerException listed in the TODO and the ClassCastException with the join operation went away. Previously, I had been only recompiling the modules of Flink that had been changed to save time using "mvn clean install -pl :module" and apparently that may have been causing some of my issues. Now, the problem is more clear: when a specific group reduce function in my research project plan file is used within an iteration, I get a ClassCastException exception: Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) at org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) at org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) at org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) at java.lang.Thread.run(Thread.java:745) I'm not sure why this is causing an exception, and I would greatly appreciate any assistance. I've revised the barebones error-causing plan file to focus on this new error source: https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a The group reduce function in question seems to work just fine outside of iterations. I have organized the commits and pushed to a new branch to make it easier to test and hopefully review soon: https://github.com/GEOFBOT/flink/tree/new-iterations Cheers, Geoffrey On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> wrote: > Hello Geoffrey, > > i could not reproduce this issue with the commits and plan you provided. > > I tried out both the FLINK-4098 and bulk-iterations branches (and > reverted back to the specified commits) and built Flink from scratch. > > Could you double check that the code you provided produces the error? > Also, which OS/python version are you using? > > Regards, > Chesnay > > On 20.09.2016 11:13, Chesnay Schepler wrote: > > Hello, > > > > I'll try to take a look this week. > > > > Regards, > > Chesnay > > > > On 20.09.2016 02:38, Geoffrey Mon wrote: > >> Hello all, > >> > >> I have recently been working on adding bulk iterations to the Python > >> API of > >> Flink in order to facilitate a research project I am working on. The > >> current changes can be seen in this GitHub diff: > >> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >> > >> > >> This implementation seems to work for, at least, simple examples, > >> such as > >> incrementing numbers in a data set. However, with the transformations > >> required for my project, I get an exception > >> "java.lang.ClassCastException: > >> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >> from the > >> deserializers called by > >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >> I've created the following simplified Python plan by stripping down my > >> research project code to the problem-causing parts: > >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >> > >> I have been working on this issue but I don't have any ideas on what > >> might > >> be the problem. Perhaps someone more knowledgeable about the interior of > >> the Python API could kindly help? > >> > >> Thank you very much. > >> > >> Geoffrey Mon > >> > > > > > > |
Hello Geoffrey,
this one works for me as well :D Regards, Chesnay On 28.09.2016 05:38, Geoffrey Mon wrote: > Hello Chesnay, > > Thank you for your help. After receiving your message I recompiled my > version of Flink completely, and both the NullPointerException listed in > the TODO and the ClassCastException with the join operation went away. > Previously, I had been only recompiling the modules of Flink that had been > changed to save time using "mvn clean install -pl :module" and apparently > that may have been causing some of my issues. > > Now, the problem is more clear: when a specific group reduce function in my > research project plan file is used within an iteration, I get a > ClassCastException exception: > Caused by: java.lang.ClassCastException: > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > at > org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > at > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > at > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > at > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > at java.lang.Thread.run(Thread.java:745) > > I'm not sure why this is causing an exception, and I would greatly > appreciate any assistance. I've revised the barebones error-causing plan > file to focus on this new error source: > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > The group reduce function in question seems to work just fine outside of > iterations. I have organized the commits and pushed to a new branch to make > it easier to test and hopefully review soon: > https://github.com/GEOFBOT/flink/tree/new-iterations > > Cheers, > Geoffrey > > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> wrote: > >> Hello Geoffrey, >> >> i could not reproduce this issue with the commits and plan you provided. >> >> I tried out both the FLINK-4098 and bulk-iterations branches (and >> reverted back to the specified commits) and built Flink from scratch. >> >> Could you double check that the code you provided produces the error? >> Also, which OS/python version are you using? >> >> Regards, >> Chesnay >> >> On 20.09.2016 11:13, Chesnay Schepler wrote: >>> Hello, >>> >>> I'll try to take a look this week. >>> >>> Regards, >>> Chesnay >>> >>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>> Hello all, >>>> >>>> I have recently been working on adding bulk iterations to the Python >>>> API of >>>> Flink in order to facilitate a research project I am working on. The >>>> current changes can be seen in this GitHub diff: >>>> >> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>>> >>>> This implementation seems to work for, at least, simple examples, >>>> such as >>>> incrementing numbers in a data set. However, with the transformations >>>> required for my project, I get an exception >>>> "java.lang.ClassCastException: >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown >>>> from the >>>> deserializers called by >>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>>> I've created the following simplified Python plan by stripping down my >>>> research project code to the problem-causing parts: >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>> >>>> I have been working on this issue but I don't have any ideas on what >>>> might >>>> be the problem. Perhaps someone more knowledgeable about the interior of >>>> the Python API could kindly help? >>>> >>>> Thank you very much. >>>> >>>> Geoffrey Mon >>>> >>> >> |
Hi Chesnay,
Heh, I have discovered that if I do not restart Flink after running my original problematic script, then similar issues will manifest themselves in other otherwise working scripts. I haven't been able to completely narrow down the problem, but I promise this new script will have a ClassCastException that is completely reproducible. :) https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a Thanks, Geoffrey On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> wrote: > Hello Geoffrey, > > this one works for me as well :D > > Regards, > Chesnay > > On 28.09.2016 05:38, Geoffrey Mon wrote: > > Hello Chesnay, > > > > Thank you for your help. After receiving your message I recompiled my > > version of Flink completely, and both the NullPointerException listed in > > the TODO and the ClassCastException with the join operation went away. > > Previously, I had been only recompiling the modules of Flink that had > been > > changed to save time using "mvn clean install -pl :module" and apparently > > that may have been causing some of my issues. > > > > Now, the problem is more clear: when a specific group reduce function in > my > > research project plan file is used within an iteration, I get a > > ClassCastException exception: > > Caused by: java.lang.ClassCastException: > > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > > at > > > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > > at > > org.apache.flink.runtime.iterative.io > .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > > at > > > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > > at > > > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > > at > > > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > > at > > > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > > at > > > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at > > > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > > at > > > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > > at java.lang.Thread.run(Thread.java:745) > > > > I'm not sure why this is causing an exception, and I would greatly > > appreciate any assistance. I've revised the barebones error-causing plan > > file to focus on this new error source: > > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > The group reduce function in question seems to work just fine outside of > > iterations. I have organized the commits and pushed to a new branch to > make > > it easier to test and hopefully review soon: > > https://github.com/GEOFBOT/flink/tree/new-iterations > > > > Cheers, > > Geoffrey > > > > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> > wrote: > > > >> Hello Geoffrey, > >> > >> i could not reproduce this issue with the commits and plan you provided. > >> > >> I tried out both the FLINK-4098 and bulk-iterations branches (and > >> reverted back to the specified commits) and built Flink from scratch. > >> > >> Could you double check that the code you provided produces the error? > >> Also, which OS/python version are you using? > >> > >> Regards, > >> Chesnay > >> > >> On 20.09.2016 11:13, Chesnay Schepler wrote: > >>> Hello, > >>> > >>> I'll try to take a look this week. > >>> > >>> Regards, > >>> Chesnay > >>> > >>> On 20.09.2016 02:38, Geoffrey Mon wrote: > >>>> Hello all, > >>>> > >>>> I have recently been working on adding bulk iterations to the Python > >>>> API of > >>>> Flink in order to facilitate a research project I am working on. The > >>>> current changes can be seen in this GitHub diff: > >>>> > >> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >>>> > >>>> This implementation seems to work for, at least, simple examples, > >>>> such as > >>>> incrementing numbers in a data set. However, with the transformations > >>>> required for my project, I get an exception > >>>> "java.lang.ClassCastException: > >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >>>> from the > >>>> deserializers called by > >>>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >>>> I've created the following simplified Python plan by stripping down my > >>>> research project code to the problem-causing parts: > >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>> > >>>> I have been working on this issue but I don't have any ideas on what > >>>> might > >>>> be the problem. Perhaps someone more knowledgeable about the interior > of > >>>> the Python API could kindly help? > >>>> > >>>> Thank you very much. > >>>> > >>>> Geoffrey Mon > >>>> > >>> > >> > > |
Hello,
Has anyone had a chance to look into this? I am currently working on the problem but I have minimal understanding of how the internal Flink Python API works; any expertise would be greatly appreciated. Thank you very much! Geoffrey On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote: > Hi Chesnay, > > Heh, I have discovered that if I do not restart Flink after running my > original problematic script, then similar issues will manifest themselves > in other otherwise working scripts. I haven't been able to completely > narrow down the problem, but I promise this new script will have a > ClassCastException that is completely reproducible. :) > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > Thanks, > Geoffrey > > On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> > wrote: > > Hello Geoffrey, > > this one works for me as well :D > > Regards, > Chesnay > > On 28.09.2016 05:38, Geoffrey Mon wrote: > > Hello Chesnay, > > > > Thank you for your help. After receiving your message I recompiled my > > version of Flink completely, and both the NullPointerException listed in > > the TODO and the ClassCastException with the join operation went away. > > Previously, I had been only recompiling the modules of Flink that had > been > > changed to save time using "mvn clean install -pl :module" and apparently > > that may have been causing some of my issues. > > > > Now, the problem is more clear: when a specific group reduce function in > my > > research project plan file is used within an iteration, I get a > > ClassCastException exception: > > Caused by: java.lang.ClassCastException: > > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > > at > > > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > > at > > org.apache.flink.runtime.iterative.io > .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > > at > > > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > > at > > > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > > at > > > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > > at > > > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > > at > > > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at > > > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > > at > > > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > > at java.lang.Thread.run(Thread.java:745) > > > > I'm not sure why this is causing an exception, and I would greatly > > appreciate any assistance. I've revised the barebones error-causing plan > > file to focus on this new error source: > > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > > The group reduce function in question seems to work just fine outside of > > iterations. I have organized the commits and pushed to a new branch to > make > > it easier to test and hopefully review soon: > > https://github.com/GEOFBOT/flink/tree/new-iterations > > > > Cheers, > > Geoffrey > > > > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> > wrote: > > > >> Hello Geoffrey, > >> > >> i could not reproduce this issue with the commits and plan you provided. > >> > >> I tried out both the FLINK-4098 and bulk-iterations branches (and > >> reverted back to the specified commits) and built Flink from scratch. > >> > >> Could you double check that the code you provided produces the error? > >> Also, which OS/python version are you using? > >> > >> Regards, > >> Chesnay > >> > >> On 20.09.2016 11:13, Chesnay Schepler wrote: > >>> Hello, > >>> > >>> I'll try to take a look this week. > >>> > >>> Regards, > >>> Chesnay > >>> > >>> On 20.09.2016 02:38, Geoffrey Mon wrote: > >>>> Hello all, > >>>> > >>>> I have recently been working on adding bulk iterations to the Python > >>>> API of > >>>> Flink in order to facilitate a research project I am working on. The > >>>> current changes can be seen in this GitHub diff: > >>>> > >> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >>>> > >>>> This implementation seems to work for, at least, simple examples, > >>>> such as > >>>> incrementing numbers in a data set. However, with the transformations > >>>> required for my project, I get an exception > >>>> "java.lang.ClassCastException: > >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >>>> from the > >>>> deserializers called by > >>>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >>>> I've created the following simplified Python plan by stripping down my > >>>> research project code to the problem-causing parts: > >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>> > >>>> I have been working on this issue but I don't have any ideas on what > >>>> might > >>>> be the problem. Perhaps someone more knowledgeable about the interior > of > >>>> the Python API could kindly help? > >>>> > >>>> Thank you very much. > >>>> > >>>> Geoffrey Mon > >>>> > >>> > >> > > |
Hey Geoffrey,
I was able to reproduce the error and will look into it in more detail tomorrow. Regards, Chesnay On 12.10.2016 23:09, Geoffrey Mon wrote: > Hello, > > Has anyone had a chance to look into this? I am currently working on the > problem but I have minimal understanding of how the internal Flink Python > API works; any expertise would be greatly appreciated. > > Thank you very much! > > Geoffrey > > On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote: > >> Hi Chesnay, >> >> Heh, I have discovered that if I do not restart Flink after running my >> original problematic script, then similar issues will manifest themselves >> in other otherwise working scripts. I haven't been able to completely >> narrow down the problem, but I promise this new script will have a >> ClassCastException that is completely reproducible. :) >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >> >> Thanks, >> Geoffrey >> >> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> >> wrote: >> >> Hello Geoffrey, >> >> this one works for me as well :D >> >> Regards, >> Chesnay >> >> On 28.09.2016 05:38, Geoffrey Mon wrote: >>> Hello Chesnay, >>> >>> Thank you for your help. After receiving your message I recompiled my >>> version of Flink completely, and both the NullPointerException listed in >>> the TODO and the ClassCastException with the join operation went away. >>> Previously, I had been only recompiling the modules of Flink that had >> been >>> changed to save time using "mvn clean install -pl :module" and apparently >>> that may have been causing some of my issues. >>> >>> Now, the problem is more clear: when a specific group reduce function in >> my >>> research project plan file is used within an iteration, I get a >>> ClassCastException exception: >>> Caused by: java.lang.ClassCastException: >>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>> at >>> >> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) >>> at >>> org.apache.flink.runtime.iterative.io >> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>> at >>> >> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> at >>> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>> at >>> >> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>> at >>> >> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>> at >>> >> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>> at >>> >> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>> at >>> >> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> I'm not sure why this is causing an exception, and I would greatly >>> appreciate any assistance. I've revised the barebones error-causing plan >>> file to focus on this new error source: >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>> The group reduce function in question seems to work just fine outside of >>> iterations. I have organized the commits and pushed to a new branch to >> make >>> it easier to test and hopefully review soon: >>> https://github.com/GEOFBOT/flink/tree/new-iterations >>> >>> Cheers, >>> Geoffrey >>> >>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> >> wrote: >>>> Hello Geoffrey, >>>> >>>> i could not reproduce this issue with the commits and plan you provided. >>>> >>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>> reverted back to the specified commits) and built Flink from scratch. >>>> >>>> Could you double check that the code you provided produces the error? >>>> Also, which OS/python version are you using? >>>> >>>> Regards, >>>> Chesnay >>>> >>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>> Hello, >>>>> >>>>> I'll try to take a look this week. >>>>> >>>>> Regards, >>>>> Chesnay >>>>> >>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>> Hello all, >>>>>> >>>>>> I have recently been working on adding bulk iterations to the Python >>>>>> API of >>>>>> Flink in order to facilitate a research project I am working on. The >>>>>> current changes can be seen in this GitHub diff: >>>>>> >> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>>>>> This implementation seems to work for, at least, simple examples, >>>>>> such as >>>>>> incrementing numbers in a data set. However, with the transformations >>>>>> required for my project, I get an exception >>>>>> "java.lang.ClassCastException: >>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown >>>>>> from the >>>>>> deserializers called by >>>>>> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>>>>> I've created the following simplified Python plan by stripping down my >>>>>> research project code to the problem-causing parts: >>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>> >>>>>> I have been working on this issue but I don't have any ideas on what >>>>>> might >>>>>> be the problem. Perhaps someone more knowledgeable about the interior >> of >>>>>> the Python API could kindly help? >>>>>> >>>>>> Thank you very much. >>>>>> >>>>>> Geoffrey Mon >>>>>> >> |
A temporary work around appears to be disabling chaining, which you can
do by commenting out L215 "self._find_chains()" in Environment.py. Note that you then run into a division by zero error, but i can't tell whether that is a problem of the job or not. On 13.10.2016 13:41, Chesnay Schepler wrote: > Hey Geoffrey, > > I was able to reproduce the error and will look into it in more detail > tomorrow. > > Regards, > Chesnay > > On 12.10.2016 23:09, Geoffrey Mon wrote: >> Hello, >> >> Has anyone had a chance to look into this? I am currently working on the >> problem but I have minimal understanding of how the internal Flink >> Python >> API works; any expertise would be greatly appreciated. >> >> Thank you very much! >> >> Geoffrey >> >> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote: >> >>> Hi Chesnay, >>> >>> Heh, I have discovered that if I do not restart Flink after running my >>> original problematic script, then similar issues will manifest >>> themselves >>> in other otherwise working scripts. I haven't been able to completely >>> narrow down the problem, but I promise this new script will have a >>> ClassCastException that is completely reproducible. :) >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>> >>> Thanks, >>> Geoffrey >>> >>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> >>> wrote: >>> >>> Hello Geoffrey, >>> >>> this one works for me as well :D >>> >>> Regards, >>> Chesnay >>> >>> On 28.09.2016 05:38, Geoffrey Mon wrote: >>>> Hello Chesnay, >>>> >>>> Thank you for your help. After receiving your message I recompiled my >>>> version of Flink completely, and both the NullPointerException >>>> listed in >>>> the TODO and the ClassCastException with the join operation went away. >>>> Previously, I had been only recompiling the modules of Flink that had >>> been >>>> changed to save time using "mvn clean install -pl :module" and >>>> apparently >>>> that may have been causing some of my issues. >>>> >>>> Now, the problem is more clear: when a specific group reduce >>>> function in >>> my >>>> research project plan file is used within an iteration, I get a >>>> ClassCastException exception: >>>> Caused by: java.lang.ClassCastException: >>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>>> at >>>> >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) >>> >>>> at >>>> org.apache.flink.runtime.iterative.io >>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>> >>>> at >>>> >>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> >>>> at >>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>> >>>> at >>>> >>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>> >>>> at >>>> >>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>> >>>> at >>>> >>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>> >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>>> at >>>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>> >>>> at >>>> >>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>> >>>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I'm not sure why this is causing an exception, and I would greatly >>>> appreciate any assistance. I've revised the barebones error-causing >>>> plan >>>> file to focus on this new error source: >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>> The group reduce function in question seems to work just fine >>>> outside of >>>> iterations. I have organized the commits and pushed to a new branch to >>> make >>>> it easier to test and hopefully review soon: >>>> https://github.com/GEOFBOT/flink/tree/new-iterations >>>> >>>> Cheers, >>>> Geoffrey >>>> >>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> >>> wrote: >>>>> Hello Geoffrey, >>>>> >>>>> i could not reproduce this issue with the commits and plan you >>>>> provided. >>>>> >>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>>> reverted back to the specified commits) and built Flink from scratch. >>>>> >>>>> Could you double check that the code you provided produces the error? >>>>> Also, which OS/python version are you using? >>>>> >>>>> Regards, >>>>> Chesnay >>>>> >>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>>> Hello, >>>>>> >>>>>> I'll try to take a look this week. >>>>>> >>>>>> Regards, >>>>>> Chesnay >>>>>> >>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>>> Hello all, >>>>>>> >>>>>>> I have recently been working on adding bulk iterations to the >>>>>>> Python >>>>>>> API of >>>>>>> Flink in order to facilitate a research project I am working on. >>>>>>> The >>>>>>> current changes can be seen in this GitHub diff: >>>>>>> >>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>> >>>>>>> This implementation seems to work for, at least, simple examples, >>>>>>> such as >>>>>>> incrementing numbers in a data set. However, with the >>>>>>> transformations >>>>>>> required for my project, I get an exception >>>>>>> "java.lang.ClassCastException: >>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown >>>>>>> from the >>>>>>> deserializers called by >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>> >>>>>>> I've created the following simplified Python plan by stripping >>>>>>> down my >>>>>>> research project code to the problem-causing parts: >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>> >>>>>>> I have been working on this issue but I don't have any ideas on >>>>>>> what >>>>>>> might >>>>>>> be the problem. Perhaps someone more knowledgeable about the >>>>>>> interior >>> of >>>>>>> the Python API could kindly help? >>>>>>> >>>>>>> Thank you very much. >>>>>>> >>>>>>> Geoffrey Mon >>>>>>> >>> > > |
Thank you very much. Disabling chaining with the Python API allows my
actual script to run properly. The division by zero must be an issue with the job that I posted on gist. Does that mean that the issue must be in the chaining part of the API? Chaining from the way I understand it is an important optimization that would be important for the performance comparison I wish to make in my project. Cheers, Geoffrey On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> wrote: > A temporary work around appears to be disabling chaining, which you can > do by commenting out L215 "self._find_chains()" in Environment.py. > Note that you then run into a division by zero error, but i can't tell > whether that is a problem of the job or not. > > On 13.10.2016 13:41, Chesnay Schepler wrote: > > Hey Geoffrey, > > > > I was able to reproduce the error and will look into it in more detail > > tomorrow. > > > > Regards, > > Chesnay > > > > On 12.10.2016 23:09, Geoffrey Mon wrote: > >> Hello, > >> > >> Has anyone had a chance to look into this? I am currently working on the > >> problem but I have minimal understanding of how the internal Flink > >> Python > >> API works; any expertise would be greatly appreciated. > >> > >> Thank you very much! > >> > >> Geoffrey > >> > >> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote: > >> > >>> Hi Chesnay, > >>> > >>> Heh, I have discovered that if I do not restart Flink after running my > >>> original problematic script, then similar issues will manifest > >>> themselves > >>> in other otherwise working scripts. I haven't been able to completely > >>> narrow down the problem, but I promise this new script will have a > >>> ClassCastException that is completely reproducible. :) > >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>> > >>> Thanks, > >>> Geoffrey > >>> > >>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> > >>> wrote: > >>> > >>> Hello Geoffrey, > >>> > >>> this one works for me as well :D > >>> > >>> Regards, > >>> Chesnay > >>> > >>> On 28.09.2016 05:38, Geoffrey Mon wrote: > >>>> Hello Chesnay, > >>>> > >>>> Thank you for your help. After receiving your message I recompiled my > >>>> version of Flink completely, and both the NullPointerException > >>>> listed in > >>>> the TODO and the ClassCastException with the join operation went away. > >>>> Previously, I had been only recompiling the modules of Flink that had > >>> been > >>>> changed to save time using "mvn clean install -pl :module" and > >>>> apparently > >>>> that may have been causing some of my issues. > >>>> > >>>> Now, the problem is more clear: when a specific group reduce > >>>> function in > >>> my > >>>> research project plan file is used within an iteration, I get a > >>>> ClassCastException exception: > >>>> Caused by: java.lang.ClassCastException: > >>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B > >>>> at > >>>> > >>> > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) > >>> > >>>> at > >>>> org.apache.flink.runtime.iterative.io > >>> > .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) > >>> > >>>> at > >>>> > >>> > org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) > >>> > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > >>>> at > >>>> > >>> > org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) > >>> > >>>> at > >>>> > >>> > org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) > >>> > >>>> at > >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> > >>>> I'm not sure why this is causing an exception, and I would greatly > >>>> appreciate any assistance. I've revised the barebones error-causing > >>>> plan > >>>> file to focus on this new error source: > >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>> The group reduce function in question seems to work just fine > >>>> outside of > >>>> iterations. I have organized the commits and pushed to a new branch to > >>> make > >>>> it easier to test and hopefully review soon: > >>>> https://github.com/GEOFBOT/flink/tree/new-iterations > >>>> > >>>> Cheers, > >>>> Geoffrey > >>>> > >>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> > >>> wrote: > >>>>> Hello Geoffrey, > >>>>> > >>>>> i could not reproduce this issue with the commits and plan you > >>>>> provided. > >>>>> > >>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and > >>>>> reverted back to the specified commits) and built Flink from scratch. > >>>>> > >>>>> Could you double check that the code you provided produces the error? > >>>>> Also, which OS/python version are you using? > >>>>> > >>>>> Regards, > >>>>> Chesnay > >>>>> > >>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: > >>>>>> Hello, > >>>>>> > >>>>>> I'll try to take a look this week. > >>>>>> > >>>>>> Regards, > >>>>>> Chesnay > >>>>>> > >>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: > >>>>>>> Hello all, > >>>>>>> > >>>>>>> I have recently been working on adding bulk iterations to the > >>>>>>> Python > >>>>>>> API of > >>>>>>> Flink in order to facilitate a research project I am working on. > >>>>>>> The > >>>>>>> current changes can be seen in this GitHub diff: > >>>>>>> > >>> > https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 > >>> > >>>>>>> This implementation seems to work for, at least, simple examples, > >>>>>>> such as > >>>>>>> incrementing numbers in a data set. However, with the > >>>>>>> transformations > >>>>>>> required for my project, I get an exception > >>>>>>> "java.lang.ClassCastException: > >>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown > >>>>>>> from the > >>>>>>> deserializers called by > >>>>>>> > >>> > org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. > >>> > >>>>>>> I've created the following simplified Python plan by stripping > >>>>>>> down my > >>>>>>> research project code to the problem-causing parts: > >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a > >>>>>>> > >>>>>>> I have been working on this issue but I don't have any ideas on > >>>>>>> what > >>>>>>> might > >>>>>>> be the problem. Perhaps someone more knowledgeable about the > >>>>>>> interior > >>> of > >>>>>>> the Python API could kindly help? > >>>>>>> > >>>>>>> Thank you very much. > >>>>>>> > >>>>>>> Geoffrey Mon > >>>>>>> > >>> > > > > > > |
The chaining code is definitely related, I also have a pretty clear idea
how to fix it. The odd thing is that the Java API doesn't catch this type mismatch; the date types are known when the plan is generated. This kind of error shouldn't even happen. On 13.10.2016 21:15, Geoffrey Mon wrote: > Thank you very much. Disabling chaining with the Python API allows my > actual script to run properly. The division by zero must be an issue with > the job that I posted on gist. > > Does that mean that the issue must be in the chaining part of the API? > Chaining from the way I understand it is an important optimization that > would be important for the performance comparison I wish to make in my > project. > > Cheers, > Geoffrey > > On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> wrote: > >> A temporary work around appears to be disabling chaining, which you can >> do by commenting out L215 "self._find_chains()" in Environment.py. >> Note that you then run into a division by zero error, but i can't tell >> whether that is a problem of the job or not. >> >> On 13.10.2016 13:41, Chesnay Schepler wrote: >>> Hey Geoffrey, >>> >>> I was able to reproduce the error and will look into it in more detail >>> tomorrow. >>> >>> Regards, >>> Chesnay >>> >>> On 12.10.2016 23:09, Geoffrey Mon wrote: >>>> Hello, >>>> >>>> Has anyone had a chance to look into this? I am currently working on the >>>> problem but I have minimal understanding of how the internal Flink >>>> Python >>>> API works; any expertise would be greatly appreciated. >>>> >>>> Thank you very much! >>>> >>>> Geoffrey >>>> >>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote: >>>> >>>>> Hi Chesnay, >>>>> >>>>> Heh, I have discovered that if I do not restart Flink after running my >>>>> original problematic script, then similar issues will manifest >>>>> themselves >>>>> in other otherwise working scripts. I haven't been able to completely >>>>> narrow down the problem, but I promise this new script will have a >>>>> ClassCastException that is completely reproducible. :) >>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>> >>>>> Thanks, >>>>> Geoffrey >>>>> >>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> >>>>> wrote: >>>>> >>>>> Hello Geoffrey, >>>>> >>>>> this one works for me as well :D >>>>> >>>>> Regards, >>>>> Chesnay >>>>> >>>>> On 28.09.2016 05:38, Geoffrey Mon wrote: >>>>>> Hello Chesnay, >>>>>> >>>>>> Thank you for your help. After receiving your message I recompiled my >>>>>> version of Flink completely, and both the NullPointerException >>>>>> listed in >>>>>> the TODO and the ClassCastException with the join operation went away. >>>>>> Previously, I had been only recompiling the modules of Flink that had >>>>> been >>>>>> changed to save time using "mvn clean install -pl :module" and >>>>>> apparently >>>>>> that may have been causing some of my issues. >>>>>> >>>>>> Now, the problem is more clear: when a specific group reduce >>>>>> function in >>>>> my >>>>>> research project plan file is used within an iteration, I get a >>>>>> ClassCastException exception: >>>>>> Caused by: java.lang.ClassCastException: >>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>>>>> at >>>>>> >> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) >>>>>> at >>>>>> org.apache.flink.runtime.iterative.io >> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>>>>> at >>>>>> >> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>>>>> at >>>>>> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>>>>> at >>>>>> >> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>>>>> at >>>>>> >> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>>>>> at >>>>>> >> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>>>>> at >>>>>> >> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>>>>> at >>>>>> >> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> I'm not sure why this is causing an exception, and I would greatly >>>>>> appreciate any assistance. I've revised the barebones error-causing >>>>>> plan >>>>>> file to focus on this new error source: >>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>> The group reduce function in question seems to work just fine >>>>>> outside of >>>>>> iterations. I have organized the commits and pushed to a new branch to >>>>> make >>>>>> it easier to test and hopefully review soon: >>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations >>>>>> >>>>>> Cheers, >>>>>> Geoffrey >>>>>> >>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> >>>>> wrote: >>>>>>> Hello Geoffrey, >>>>>>> >>>>>>> i could not reproduce this issue with the commits and plan you >>>>>>> provided. >>>>>>> >>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>>>>> reverted back to the specified commits) and built Flink from scratch. >>>>>>> >>>>>>> Could you double check that the code you provided produces the error? >>>>>>> Also, which OS/python version are you using? >>>>>>> >>>>>>> Regards, >>>>>>> Chesnay >>>>>>> >>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>>>>> Hello, >>>>>>>> >>>>>>>> I'll try to take a look this week. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Chesnay >>>>>>>> >>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> I have recently been working on adding bulk iterations to the >>>>>>>>> Python >>>>>>>>> API of >>>>>>>>> Flink in order to facilitate a research project I am working on. >>>>>>>>> The >>>>>>>>> current changes can be seen in this GitHub diff: >>>>>>>>> >> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>>>>>>>> This implementation seems to work for, at least, simple examples, >>>>>>>>> such as >>>>>>>>> incrementing numbers in a data set. However, with the >>>>>>>>> transformations >>>>>>>>> required for my project, I get an exception >>>>>>>>> "java.lang.ClassCastException: >>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown >>>>>>>>> from the >>>>>>>>> deserializers called by >>>>>>>>> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>>>>>>>> I've created the following simplified Python plan by stripping >>>>>>>>> down my >>>>>>>>> research project code to the problem-causing parts: >>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>>>> >>>>>>>>> I have been working on this issue but I don't have any ideas on >>>>>>>>> what >>>>>>>>> might >>>>>>>>> be the problem. Perhaps someone more knowledgeable about the >>>>>>>>> interior >>>>> of >>>>>>>>> the Python API could kindly help? >>>>>>>>> >>>>>>>>> Thank you very much. >>>>>>>>> >>>>>>>>> Geoffrey Mon >>>>>>>>> >>> >> |
In this branch: https://github.com/zentol/flink/tree/new-iterations you
can find a more fine-grained fix for chaining with iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695 On 13.10.2016 23:11, Chesnay Schepler wrote: > The chaining code is definitely related, I also have a pretty clear > idea how to fix it. > > The odd thing is that the Java API doesn't catch this type mismatch; > the date types are > known when the plan is generated. This kind of error shouldn't even > happen. > > On 13.10.2016 21:15, Geoffrey Mon wrote: >> Thank you very much. Disabling chaining with the Python API allows my >> actual script to run properly. The division by zero must be an issue >> with >> the job that I posted on gist. >> >> Does that mean that the issue must be in the chaining part of the API? >> Chaining from the way I understand it is an important optimization that >> would be important for the performance comparison I wish to make in my >> project. >> >> Cheers, >> Geoffrey >> >> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> >> wrote: >> >>> A temporary work around appears to be disabling chaining, which you can >>> do by commenting out L215 "self._find_chains()" in Environment.py. >>> Note that you then run into a division by zero error, but i can't tell >>> whether that is a problem of the job or not. >>> >>> On 13.10.2016 13:41, Chesnay Schepler wrote: >>>> Hey Geoffrey, >>>> >>>> I was able to reproduce the error and will look into it in more detail >>>> tomorrow. >>>> >>>> Regards, >>>> Chesnay >>>> >>>> On 12.10.2016 23:09, Geoffrey Mon wrote: >>>>> Hello, >>>>> >>>>> Has anyone had a chance to look into this? I am currently working >>>>> on the >>>>> problem but I have minimal understanding of how the internal Flink >>>>> Python >>>>> API works; any expertise would be greatly appreciated. >>>>> >>>>> Thank you very much! >>>>> >>>>> Geoffrey >>>>> >>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi Chesnay, >>>>>> >>>>>> Heh, I have discovered that if I do not restart Flink after >>>>>> running my >>>>>> original problematic script, then similar issues will manifest >>>>>> themselves >>>>>> in other otherwise working scripts. I haven't been able to >>>>>> completely >>>>>> narrow down the problem, but I promise this new script will have a >>>>>> ClassCastException that is completely reproducible. :) >>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>> >>>>>> Thanks, >>>>>> Geoffrey >>>>>> >>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler >>>>>> <[hidden email]> >>>>>> wrote: >>>>>> >>>>>> Hello Geoffrey, >>>>>> >>>>>> this one works for me as well :D >>>>>> >>>>>> Regards, >>>>>> Chesnay >>>>>> >>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote: >>>>>>> Hello Chesnay, >>>>>>> >>>>>>> Thank you for your help. After receiving your message I >>>>>>> recompiled my >>>>>>> version of Flink completely, and both the NullPointerException >>>>>>> listed in >>>>>>> the TODO and the ClassCastException with the join operation went >>>>>>> away. >>>>>>> Previously, I had been only recompiling the modules of Flink >>>>>>> that had >>>>>> been >>>>>>> changed to save time using "mvn clean install -pl :module" and >>>>>>> apparently >>>>>>> that may have been causing some of my issues. >>>>>>> >>>>>>> Now, the problem is more clear: when a specific group reduce >>>>>>> function in >>>>>> my >>>>>>> research project plan file is used within an iteration, I get a >>>>>>> ClassCastException exception: >>>>>>> Caused by: java.lang.ClassCastException: >>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>>>>>> at >>>>>>> >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31) >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.iterative.io >>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>>>>>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>> >>>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>>>>> >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> I'm not sure why this is causing an exception, and I would greatly >>>>>>> appreciate any assistance. I've revised the barebones error-causing >>>>>>> plan >>>>>>> file to focus on this new error source: >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>> The group reduce function in question seems to work just fine >>>>>>> outside of >>>>>>> iterations. I have organized the commits and pushed to a new >>>>>>> branch to >>>>>> make >>>>>>> it easier to test and hopefully review soon: >>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations >>>>>>> >>>>>>> Cheers, >>>>>>> Geoffrey >>>>>>> >>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler >>>>>>> <[hidden email]> >>>>>> wrote: >>>>>>>> Hello Geoffrey, >>>>>>>> >>>>>>>> i could not reproduce this issue with the commits and plan you >>>>>>>> provided. >>>>>>>> >>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>>>>>> reverted back to the specified commits) and built Flink from >>>>>>>> scratch. >>>>>>>> >>>>>>>> Could you double check that the code you provided produces the >>>>>>>> error? >>>>>>>> Also, which OS/python version are you using? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Chesnay >>>>>>>> >>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'll try to take a look this week. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Chesnay >>>>>>>>> >>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have recently been working on adding bulk iterations to the >>>>>>>>>> Python >>>>>>>>>> API of >>>>>>>>>> Flink in order to facilitate a research project I am working on. >>>>>>>>>> The >>>>>>>>>> current changes can be seen in this GitHub diff: >>>>>>>>>> >>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 >>> >>>>>>>>>> This implementation seems to work for, at least, simple >>>>>>>>>> examples, >>>>>>>>>> such as >>>>>>>>>> incrementing numbers in a data set. However, with the >>>>>>>>>> transformations >>>>>>>>>> required for my project, I get an exception >>>>>>>>>> "java.lang.ClassCastException: >>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" >>>>>>>>>> thrown >>>>>>>>>> from the >>>>>>>>>> deserializers called by >>>>>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>> >>>>>>>>>> I've created the following simplified Python plan by stripping >>>>>>>>>> down my >>>>>>>>>> research project code to the problem-causing parts: >>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>>>>> >>>>>>>>>> I have been working on this issue but I don't have any ideas on >>>>>>>>>> what >>>>>>>>>> might >>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the >>>>>>>>>> interior >>>>>> of >>>>>>>>>> the Python API could kindly help? >>>>>>>>>> >>>>>>>>>> Thank you very much. >>>>>>>>>> >>>>>>>>>> Geoffrey Mon >>>>>>>>>> >>>> >>> > > |
Your solution works well, many thanks. This solves the exception that I
described previously. However, in a different part of the script I come across another problem about reusing data sets. For example, given the script at https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I get an exception about a data type not having a corresponding deserializer for a broadcasted variable. However, no data sets in my plan are broadcasted. I noticed that in the operations diagram generated by Flink, the data set 'S' is created two times, once each time it is used in the plan, even though I intended to reuse the data set. One of the creations involves a broadcast. Presumably this is related to an optimization built into Flink? I appreciate any insights or help you may have about this problem. Thanks, Geoffrey On Fri, Oct 14, 2016 at 7:24 AM Chesnay Schepler <[hidden email]> wrote: In this branch: https://github.com/zentol/flink/tree/new-iterations you can find a more fine-grained fix for chaining with iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695 On 13.10.2016 23:11, Chesnay Schepler wrote: > The chaining code is definitely related, I also have a pretty clear > idea how to fix it. > > The odd thing is that the Java API doesn't catch this type mismatch; > the date types are > known when the plan is generated. This kind of error shouldn't even > happen. > > On 13.10.2016 21:15, Geoffrey Mon wrote: >> Thank you very much. Disabling chaining with the Python API allows my >> actual script to run properly. The division by zero must be an issue >> with >> the job that I posted on gist. >> >> Does that mean that the issue must be in the chaining part of the API? >> Chaining from the way I understand it is an important optimization that >> would be important for the performance comparison I wish to make in my >> project. >> >> Cheers, >> Geoffrey >> >> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> >> wrote: >> >>> A temporary work around appears to be disabling chaining, which you can >>> do by commenting out L215 "self._find_chains()" in Environment.py. >>> Note that you then run into a division by zero error, but i can't tell >>> whether that is a problem of the job or not. >>> >>> On 13.10.2016 13:41, Chesnay Schepler wrote: >>>> Hey Geoffrey, >>>> >>>> I was able to reproduce the error and will look into it in more detail >>>> tomorrow. >>>> >>>> Regards, >>>> Chesnay >>>> >>>> On 12.10.2016 23:09, Geoffrey Mon wrote: >>>>> Hello, >>>>> >>>>> Has anyone had a chance to look into this? I am currently working >>>>> on the >>>>> problem but I have minimal understanding of how the internal Flink >>>>> Python >>>>> API works; any expertise would be greatly appreciated. >>>>> >>>>> Thank you very much! >>>>> >>>>> Geoffrey >>>>> >>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi Chesnay, >>>>>> >>>>>> Heh, I have discovered that if I do not restart Flink after >>>>>> running my >>>>>> original problematic script, then similar issues will manifest >>>>>> themselves >>>>>> in other otherwise working scripts. I haven't been able to >>>>>> completely >>>>>> narrow down the problem, but I promise this new script will have a >>>>>> ClassCastException that is completely reproducible. :) >>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>> >>>>>> Thanks, >>>>>> Geoffrey >>>>>> >>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler >>>>>> <[hidden email]> >>>>>> wrote: >>>>>> >>>>>> Hello Geoffrey, >>>>>> >>>>>> this one works for me as well :D >>>>>> >>>>>> Regards, >>>>>> Chesnay >>>>>> >>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote: >>>>>>> Hello Chesnay, >>>>>>> >>>>>>> Thank you for your help. After receiving your message I >>>>>>> recompiled my >>>>>>> version of Flink completely, and both the NullPointerException >>>>>>> listed in >>>>>>> the TODO and the ClassCastException with the join operation went >>>>>>> away. >>>>>>> Previously, I had been only recompiling the modules of Flink >>>>>>> that had >>>>>> been >>>>>>> changed to save time using "mvn clean install -pl :module" and >>>>>>> apparently >>>>>>> that may have been causing some of my issues. >>>>>>> >>>>>>> Now, the problem is more clear: when a specific group reduce >>>>>>> function in >>>>>> my >>>>>>> research project plan file is used within an iteration, I get a >>>>>>> ClassCastException exception: >>>>>>> Caused by: java.lang.ClassCastException: >>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B >>>>>>> at >>>>>>> >>> >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.iterative.io >>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) >>> >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >>>>>>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) >>> >>>>>>> at >>>>>>> >>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) >>> >>>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >>>>>> >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> I'm not sure why this is causing an exception, and I would greatly >>>>>>> appreciate any assistance. I've revised the barebones error-causing >>>>>>> plan >>>>>>> file to focus on this new error source: >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>> The group reduce function in question seems to work just fine >>>>>>> outside of >>>>>>> iterations. I have organized the commits and pushed to a new >>>>>>> branch to >>>>>> make >>>>>>> it easier to test and hopefully review soon: >>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations >>>>>>> >>>>>>> Cheers, >>>>>>> Geoffrey >>>>>>> >>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler >>>>>>> <[hidden email]> >>>>>> wrote: >>>>>>>> Hello Geoffrey, >>>>>>>> >>>>>>>> i could not reproduce this issue with the commits and plan you >>>>>>>> provided. >>>>>>>> >>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and >>>>>>>> reverted back to the specified commits) and built Flink from >>>>>>>> scratch. >>>>>>>> >>>>>>>> Could you double check that the code you provided produces the >>>>>>>> error? >>>>>>>> Also, which OS/python version are you using? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Chesnay >>>>>>>> >>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'll try to take a look this week. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Chesnay >>>>>>>>> >>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote: >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have recently been working on adding bulk iterations to the >>>>>>>>>> Python >>>>>>>>>> API of >>>>>>>>>> Flink in order to facilitate a research project I am working on. >>>>>>>>>> The >>>>>>>>>> current changes can be seen in this GitHub diff: >>>>>>>>>> >>> >>> >>>>>>>>>> This implementation seems to work for, at least, simple >>>>>>>>>> examples, >>>>>>>>>> such as >>>>>>>>>> incrementing numbers in a data set. However, with the >>>>>>>>>> transformations >>>>>>>>>> required for my project, I get an exception >>>>>>>>>> "java.lang.ClassCastException: >>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" >>>>>>>>>> thrown >>>>>>>>>> from the >>>>>>>>>> deserializers called by >>>>>>>>>> >>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer. >>> >>>>>>>>>> I've created the following simplified Python plan by stripping >>>>>>>>>> down my >>>>>>>>>> research project code to the problem-causing parts: >>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a >>>>>>>>>> >>>>>>>>>> I have been working on this issue but I don't have any ideas on >>>>>>>>>> what >>>>>>>>>> might >>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the >>>>>>>>>> interior >>>>>> of >>>>>>>>>> the Python API could kindly help? >>>>>>>>>> >>>>>>>>>> Thank you very much. >>>>>>>>>> >>>>>>>>>> Geoffrey Mon >>>>>>>>>> >>>> >>> > > |
Free forum by Nabble | Edit this page |