Exception from in-progress implementation of Python API bulk iterations

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Hello all,

I have recently been working on adding bulk iterations to the Python API of
Flink in order to facilitate a research project I am working on. The
current changes can be seen in this GitHub diff:
https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0

This implementation seems to work for, at least, simple examples, such as
incrementing numbers in a data set. However, with the transformations
required for my project, I get an exception "java.lang.ClassCastException:
[B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown from the
deserializers called by
org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
I've created the following simplified Python plan by stripping down my
research project code to the problem-causing parts:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

I have been working on this issue but I don't have any ideas on what might
be the problem. Perhaps someone more knowledgeable about the interior of
the Python API could kindly help?

Thank you very much.

Geoffrey Mon
Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
Hello,

I'll try to take a look this week.

Regards,
Chesnay

On 20.09.2016 02:38, Geoffrey Mon wrote:

> Hello all,
>
> I have recently been working on adding bulk iterations to the Python API of
> Flink in order to facilitate a research project I am working on. The
> current changes can be seen in this GitHub diff:
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>
> This implementation seems to work for, at least, simple examples, such as
> incrementing numbers in a data set. However, with the transformations
> required for my project, I get an exception "java.lang.ClassCastException:
> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown from the
> deserializers called by
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> I've created the following simplified Python plan by stripping down my
> research project code to the problem-causing parts:
> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>
> I have been working on this issue but I don't have any ideas on what might
> be the problem. Perhaps someone more knowledgeable about the interior of
> the Python API could kindly help?
>
> Thank you very much.
>
> Geoffrey Mon
>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
Hello Geoffrey,

i could not reproduce this issue with the commits and plan you provided.

I tried out both the FLINK-4098 and bulk-iterations branches (and
reverted back to the specified commits) and built Flink from scratch.

Could you double check that the code you provided produces the error?
Also, which OS/python version are you using?

Regards,
Chesnay

On 20.09.2016 11:13, Chesnay Schepler wrote:

> Hello,
>
> I'll try to take a look this week.
>
> Regards,
> Chesnay
>
> On 20.09.2016 02:38, Geoffrey Mon wrote:
>> Hello all,
>>
>> I have recently been working on adding bulk iterations to the Python
>> API of
>> Flink in order to facilitate a research project I am working on. The
>> current changes can be seen in this GitHub diff:
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 
>>
>>
>> This implementation seems to work for, at least, simple examples,
>> such as
>> incrementing numbers in a data set. However, with the transformations
>> required for my project, I get an exception
>> "java.lang.ClassCastException:
>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>> from the
>> deserializers called by
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>> I've created the following simplified Python plan by stripping down my
>> research project code to the problem-causing parts:
>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>
>> I have been working on this issue but I don't have any ideas on what
>> might
>> be the problem. Perhaps someone more knowledgeable about the interior of
>> the Python API could kindly help?
>>
>> Thank you very much.
>>
>> Geoffrey Mon
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Hello Chesnay,

Thank you for your help. After receiving your message I recompiled my
version of Flink completely, and both the NullPointerException listed in
the TODO and the ClassCastException with the join operation went away.
Previously, I had been only recompiling the modules of Flink that had been
changed to save time using "mvn clean install -pl :module" and apparently
that may have been causing some of my issues.

Now, the problem is more clear: when a specific group reduce function in my
research project plan file is used within an iteration, I get a
ClassCastException exception:
Caused by: java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
at
org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
at
org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
at java.lang.Thread.run(Thread.java:745)

I'm not sure why this is causing an exception, and I would greatly
appreciate any assistance. I've revised the barebones error-causing plan
file to focus on this new error source:
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
The group reduce function in question seems to work just fine outside of
iterations. I have organized the commits and pushed to a new branch to make
it easier to test and hopefully review soon:
https://github.com/GEOFBOT/flink/tree/new-iterations

Cheers,
Geoffrey

On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> wrote:

> Hello Geoffrey,
>
> i could not reproduce this issue with the commits and plan you provided.
>
> I tried out both the FLINK-4098 and bulk-iterations branches (and
> reverted back to the specified commits) and built Flink from scratch.
>
> Could you double check that the code you provided produces the error?
> Also, which OS/python version are you using?
>
> Regards,
> Chesnay
>
> On 20.09.2016 11:13, Chesnay Schepler wrote:
> > Hello,
> >
> > I'll try to take a look this week.
> >
> > Regards,
> > Chesnay
> >
> > On 20.09.2016 02:38, Geoffrey Mon wrote:
> >> Hello all,
> >>
> >> I have recently been working on adding bulk iterations to the Python
> >> API of
> >> Flink in order to facilitate a research project I am working on. The
> >> current changes can be seen in this GitHub diff:
> >>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>
> >>
> >> This implementation seems to work for, at least, simple examples,
> >> such as
> >> incrementing numbers in a data set. However, with the transformations
> >> required for my project, I get an exception
> >> "java.lang.ClassCastException:
> >> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
> >> from the
> >> deserializers called by
> >> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >> I've created the following simplified Python plan by stripping down my
> >> research project code to the problem-causing parts:
> >> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>
> >> I have been working on this issue but I don't have any ideas on what
> >> might
> >> be the problem. Perhaps someone more knowledgeable about the interior of
> >> the Python API could kindly help?
> >>
> >> Thank you very much.
> >>
> >> Geoffrey Mon
> >>
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
Hello Geoffrey,

this one works for me as well :D

Regards,
Chesnay

On 28.09.2016 05:38, Geoffrey Mon wrote:

> Hello Chesnay,
>
> Thank you for your help. After receiving your message I recompiled my
> version of Flink completely, and both the NullPointerException listed in
> the TODO and the ClassCastException with the join operation went away.
> Previously, I had been only recompiling the modules of Flink that had been
> changed to save time using "mvn clean install -pl :module" and apparently
> that may have been causing some of my issues.
>
> Now, the problem is more clear: when a specific group reduce function in my
> research project plan file is used within an iteration, I get a
> ClassCastException exception:
> Caused by: java.lang.ClassCastException:
> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> at
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> at
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> at
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm not sure why this is causing an exception, and I would greatly
> appreciate any assistance. I've revised the barebones error-causing plan
> file to focus on this new error source:
> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> The group reduce function in question seems to work just fine outside of
> iterations. I have organized the commits and pushed to a new branch to make
> it easier to test and hopefully review soon:
> https://github.com/GEOFBOT/flink/tree/new-iterations
>
> Cheers,
> Geoffrey
>
> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]> wrote:
>
>> Hello Geoffrey,
>>
>> i could not reproduce this issue with the commits and plan you provided.
>>
>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>> reverted back to the specified commits) and built Flink from scratch.
>>
>> Could you double check that the code you provided produces the error?
>> Also, which OS/python version are you using?
>>
>> Regards,
>> Chesnay
>>
>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>> Hello,
>>>
>>> I'll try to take a look this week.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>> Hello all,
>>>>
>>>> I have recently been working on adding bulk iterations to the Python
>>>> API of
>>>> Flink in order to facilitate a research project I am working on. The
>>>> current changes can be seen in this GitHub diff:
>>>>
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>>
>>>> This implementation seems to work for, at least, simple examples,
>>>> such as
>>>> incrementing numbers in a data set. However, with the transformations
>>>> required for my project, I get an exception
>>>> "java.lang.ClassCastException:
>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>>>> from the
>>>> deserializers called by
>>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>> I've created the following simplified Python plan by stripping down my
>>>> research project code to the problem-causing parts:
>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>
>>>> I have been working on this issue but I don't have any ideas on what
>>>> might
>>>> be the problem. Perhaps someone more knowledgeable about the interior of
>>>> the Python API could kindly help?
>>>>
>>>> Thank you very much.
>>>>
>>>> Geoffrey Mon
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Hi Chesnay,

Heh, I have discovered that if I do not restart Flink after running my
original problematic script, then similar issues will manifest themselves
in other otherwise working scripts. I haven't been able to completely
narrow down the problem, but I promise this new script will have a
ClassCastException that is completely reproducible. :)
https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a

Thanks,
Geoffrey

On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]> wrote:

> Hello Geoffrey,
>
> this one works for me as well :D
>
> Regards,
> Chesnay
>
> On 28.09.2016 05:38, Geoffrey Mon wrote:
> > Hello Chesnay,
> >
> > Thank you for your help. After receiving your message I recompiled my
> > version of Flink completely, and both the NullPointerException listed in
> > the TODO and the ClassCastException with the join operation went away.
> > Previously, I had been only recompiling the modules of Flink that had
> been
> > changed to save time using "mvn clean install -pl :module" and apparently
> > that may have been causing some of my issues.
> >
> > Now, the problem is more clear: when a specific group reduce function in
> my
> > research project plan file is used within an iteration, I get a
> > ClassCastException exception:
> > Caused by: java.lang.ClassCastException:
> > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> > at
> >
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> > at
> > org.apache.flink.runtime.iterative.io
> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> > at
> >
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> > at
> >
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> > at
> >
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at
> >
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> > at
> >
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> > at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > I'm not sure why this is causing an exception, and I would greatly
> > appreciate any assistance. I've revised the barebones error-causing plan
> > file to focus on this new error source:
> > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> > The group reduce function in question seems to work just fine outside of
> > iterations. I have organized the commits and pushed to a new branch to
> make
> > it easier to test and hopefully review soon:
> > https://github.com/GEOFBOT/flink/tree/new-iterations
> >
> > Cheers,
> > Geoffrey
> >
> > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
> wrote:
> >
> >> Hello Geoffrey,
> >>
> >> i could not reproduce this issue with the commits and plan you provided.
> >>
> >> I tried out both the FLINK-4098 and bulk-iterations branches (and
> >> reverted back to the specified commits) and built Flink from scratch.
> >>
> >> Could you double check that the code you provided produces the error?
> >> Also, which OS/python version are you using?
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 20.09.2016 11:13, Chesnay Schepler wrote:
> >>> Hello,
> >>>
> >>> I'll try to take a look this week.
> >>>
> >>> Regards,
> >>> Chesnay
> >>>
> >>> On 20.09.2016 02:38, Geoffrey Mon wrote:
> >>>> Hello all,
> >>>>
> >>>> I have recently been working on adding bulk iterations to the Python
> >>>> API of
> >>>> Flink in order to facilitate a research project I am working on. The
> >>>> current changes can be seen in this GitHub diff:
> >>>>
> >>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>>>
> >>>> This implementation seems to work for, at least, simple examples,
> >>>> such as
> >>>> incrementing numbers in a data set. However, with the transformations
> >>>> required for my project, I get an exception
> >>>> "java.lang.ClassCastException:
> >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
> >>>> from the
> >>>> deserializers called by
> >>>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >>>> I've created the following simplified Python plan by stripping down my
> >>>> research project code to the problem-causing parts:
> >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>>
> >>>> I have been working on this issue but I don't have any ideas on what
> >>>> might
> >>>> be the problem. Perhaps someone more knowledgeable about the interior
> of
> >>>> the Python API could kindly help?
> >>>>
> >>>> Thank you very much.
> >>>>
> >>>> Geoffrey Mon
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Hello,

Has anyone had a chance to look into this? I am currently working on the
problem but I have minimal understanding of how the internal Flink Python
API works; any expertise would be greatly appreciated.

Thank you very much!

Geoffrey

On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote:

> Hi Chesnay,
>
> Heh, I have discovered that if I do not restart Flink after running my
> original problematic script, then similar issues will manifest themselves
> in other otherwise working scripts. I haven't been able to completely
> narrow down the problem, but I promise this new script will have a
> ClassCastException that is completely reproducible. :)
> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>
> Thanks,
> Geoffrey
>
> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]>
> wrote:
>
> Hello Geoffrey,
>
> this one works for me as well :D
>
> Regards,
> Chesnay
>
> On 28.09.2016 05:38, Geoffrey Mon wrote:
> > Hello Chesnay,
> >
> > Thank you for your help. After receiving your message I recompiled my
> > version of Flink completely, and both the NullPointerException listed in
> > the TODO and the ClassCastException with the join operation went away.
> > Previously, I had been only recompiling the modules of Flink that had
> been
> > changed to save time using "mvn clean install -pl :module" and apparently
> > that may have been causing some of my issues.
> >
> > Now, the problem is more clear: when a specific group reduce function in
> my
> > research project plan file is used within an iteration, I get a
> > ClassCastException exception:
> > Caused by: java.lang.ClassCastException:
> > org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> > at
> >
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> > at
> > org.apache.flink.runtime.iterative.io
> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> > at
> >
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> > at
> >
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> > at
> >
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> > at
> >
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at
> >
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> > at
> >
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> > at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > I'm not sure why this is causing an exception, and I would greatly
> > appreciate any assistance. I've revised the barebones error-causing plan
> > file to focus on this new error source:
> > https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> > The group reduce function in question seems to work just fine outside of
> > iterations. I have organized the commits and pushed to a new branch to
> make
> > it easier to test and hopefully review soon:
> > https://github.com/GEOFBOT/flink/tree/new-iterations
> >
> > Cheers,
> > Geoffrey
> >
> > On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
> wrote:
> >
> >> Hello Geoffrey,
> >>
> >> i could not reproduce this issue with the commits and plan you provided.
> >>
> >> I tried out both the FLINK-4098 and bulk-iterations branches (and
> >> reverted back to the specified commits) and built Flink from scratch.
> >>
> >> Could you double check that the code you provided produces the error?
> >> Also, which OS/python version are you using?
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 20.09.2016 11:13, Chesnay Schepler wrote:
> >>> Hello,
> >>>
> >>> I'll try to take a look this week.
> >>>
> >>> Regards,
> >>> Chesnay
> >>>
> >>> On 20.09.2016 02:38, Geoffrey Mon wrote:
> >>>> Hello all,
> >>>>
> >>>> I have recently been working on adding bulk iterations to the Python
> >>>> API of
> >>>> Flink in order to facilitate a research project I am working on. The
> >>>> current changes can be seen in this GitHub diff:
> >>>>
> >>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>>>
> >>>> This implementation seems to work for, at least, simple examples,
> >>>> such as
> >>>> incrementing numbers in a data set. However, with the transformations
> >>>> required for my project, I get an exception
> >>>> "java.lang.ClassCastException:
> >>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
> >>>> from the
> >>>> deserializers called by
> >>>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >>>> I've created the following simplified Python plan by stripping down my
> >>>> research project code to the problem-causing parts:
> >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>>
> >>>> I have been working on this issue but I don't have any ideas on what
> >>>> might
> >>>> be the problem. Perhaps someone more knowledgeable about the interior
> of
> >>>> the Python API could kindly help?
> >>>>
> >>>> Thank you very much.
> >>>>
> >>>> Geoffrey Mon
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
Hey Geoffrey,

I was able to reproduce the error and will look into it in more detail
tomorrow.

Regards,
Chesnay

On 12.10.2016 23:09, Geoffrey Mon wrote:

> Hello,
>
> Has anyone had a chance to look into this? I am currently working on the
> problem but I have minimal understanding of how the internal Flink Python
> API works; any expertise would be greatly appreciated.
>
> Thank you very much!
>
> Geoffrey
>
> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote:
>
>> Hi Chesnay,
>>
>> Heh, I have discovered that if I do not restart Flink after running my
>> original problematic script, then similar issues will manifest themselves
>> in other otherwise working scripts. I haven't been able to completely
>> narrow down the problem, but I promise this new script will have a
>> ClassCastException that is completely reproducible. :)
>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>
>> Thanks,
>> Geoffrey
>>
>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>> Hello Geoffrey,
>>
>> this one works for me as well :D
>>
>> Regards,
>> Chesnay
>>
>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>> Hello Chesnay,
>>>
>>> Thank you for your help. After receiving your message I recompiled my
>>> version of Flink completely, and both the NullPointerException listed in
>>> the TODO and the ClassCastException with the join operation went away.
>>> Previously, I had been only recompiling the modules of Flink that had
>> been
>>> changed to save time using "mvn clean install -pl :module" and apparently
>>> that may have been causing some of my issues.
>>>
>>> Now, the problem is more clear: when a specific group reduce function in
>> my
>>> research project plan file is used within an iteration, I get a
>>> ClassCastException exception:
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>> at
>>>
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>> at
>>> org.apache.flink.runtime.iterative.io
>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>> at
>>>
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>> at
>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>> at
>>>
>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>> at
>>>
>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>> at
>>>
>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>> at
>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>> at
>>>
>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>> at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm not sure why this is causing an exception, and I would greatly
>>> appreciate any assistance. I've revised the barebones error-causing plan
>>> file to focus on this new error source:
>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>> The group reduce function in question seems to work just fine outside of
>>> iterations. I have organized the commits and pushed to a new branch to
>> make
>>> it easier to test and hopefully review soon:
>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>
>>> Cheers,
>>> Geoffrey
>>>
>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>>> Hello Geoffrey,
>>>>
>>>> i could not reproduce this issue with the commits and plan you provided.
>>>>
>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>> reverted back to the specified commits) and built Flink from scratch.
>>>>
>>>> Could you double check that the code you provided produces the error?
>>>> Also, which OS/python version are you using?
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>> Hello,
>>>>>
>>>>> I'll try to take a look this week.
>>>>>
>>>>> Regards,
>>>>> Chesnay
>>>>>
>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>> Hello all,
>>>>>>
>>>>>> I have recently been working on adding bulk iterations to the Python
>>>>>> API of
>>>>>> Flink in order to facilitate a research project I am working on. The
>>>>>> current changes can be seen in this GitHub diff:
>>>>>>
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>>>> This implementation seems to work for, at least, simple examples,
>>>>>> such as
>>>>>> incrementing numbers in a data set. However, with the transformations
>>>>>> required for my project, I get an exception
>>>>>> "java.lang.ClassCastException:
>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>>>>>> from the
>>>>>> deserializers called by
>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>>>> I've created the following simplified Python plan by stripping down my
>>>>>> research project code to the problem-causing parts:
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>
>>>>>> I have been working on this issue but I don't have any ideas on what
>>>>>> might
>>>>>> be the problem. Perhaps someone more knowledgeable about the interior
>> of
>>>>>> the Python API could kindly help?
>>>>>>
>>>>>> Thank you very much.
>>>>>>
>>>>>> Geoffrey Mon
>>>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
A temporary work around appears to be disabling chaining, which you can
do by commenting out L215 "self._find_chains()" in Environment.py.
Note that you then run into a division by zero error, but i can't tell
whether that is a problem of the job or not.

On 13.10.2016 13:41, Chesnay Schepler wrote:

> Hey Geoffrey,
>
> I was able to reproduce the error and will look into it in more detail
> tomorrow.
>
> Regards,
> Chesnay
>
> On 12.10.2016 23:09, Geoffrey Mon wrote:
>> Hello,
>>
>> Has anyone had a chance to look into this? I am currently working on the
>> problem but I have minimal understanding of how the internal Flink
>> Python
>> API works; any expertise would be greatly appreciated.
>>
>> Thank you very much!
>>
>> Geoffrey
>>
>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote:
>>
>>> Hi Chesnay,
>>>
>>> Heh, I have discovered that if I do not restart Flink after running my
>>> original problematic script, then similar issues will manifest
>>> themselves
>>> in other otherwise working scripts. I haven't been able to completely
>>> narrow down the problem, but I promise this new script will have a
>>> ClassCastException that is completely reproducible. :)
>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>
>>> Thanks,
>>> Geoffrey
>>>
>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>
>>> Hello Geoffrey,
>>>
>>> this one works for me as well :D
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>> Hello Chesnay,
>>>>
>>>> Thank you for your help. After receiving your message I recompiled my
>>>> version of Flink completely, and both the NullPointerException
>>>> listed in
>>>> the TODO and the ClassCastException with the join operation went away.
>>>> Previously, I had been only recompiling the modules of Flink that had
>>> been
>>>> changed to save time using "mvn clean install -pl :module" and
>>>> apparently
>>>> that may have been causing some of my issues.
>>>>
>>>> Now, the problem is more clear: when a specific group reduce
>>>> function in
>>> my
>>>> research project plan file is used within an iteration, I get a
>>>> ClassCastException exception:
>>>> Caused by: java.lang.ClassCastException:
>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>> at
>>>>
>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>
>>>> at
>>>> org.apache.flink.runtime.iterative.io
>>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>
>>>> at
>>>>
>>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>
>>>> at
>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>
>>>> at
>>>>
>>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>
>>>> at
>>>>
>>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>
>>>> at
>>>>
>>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>> at
>>>>
>>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>
>>>> at
>>>>
>>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>
>>>> at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm not sure why this is causing an exception, and I would greatly
>>>> appreciate any assistance. I've revised the barebones error-causing
>>>> plan
>>>> file to focus on this new error source:
>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>> The group reduce function in question seems to work just fine
>>>> outside of
>>>> iterations. I have organized the commits and pushed to a new branch to
>>> make
>>>> it easier to test and hopefully review soon:
>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>
>>>> Cheers,
>>>> Geoffrey
>>>>
>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>>> Hello Geoffrey,
>>>>>
>>>>> i could not reproduce this issue with the commits and plan you
>>>>> provided.
>>>>>
>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>>> reverted back to the specified commits) and built Flink from scratch.
>>>>>
>>>>> Could you double check that the code you provided produces the error?
>>>>> Also, which OS/python version are you using?
>>>>>
>>>>> Regards,
>>>>> Chesnay
>>>>>
>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>> Hello,
>>>>>>
>>>>>> I'll try to take a look this week.
>>>>>>
>>>>>> Regards,
>>>>>> Chesnay
>>>>>>
>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I have recently been working on adding bulk iterations to the
>>>>>>> Python
>>>>>>> API of
>>>>>>> Flink in order to facilitate a research project I am working on.
>>>>>>> The
>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>
>>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 
>>>
>>>>>>> This implementation seems to work for, at least, simple examples,
>>>>>>> such as
>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>> transformations
>>>>>>> required for my project, I get an exception
>>>>>>> "java.lang.ClassCastException:
>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>>>>>>> from the
>>>>>>> deserializers called by
>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>
>>>>>>> I've created the following simplified Python plan by stripping
>>>>>>> down my
>>>>>>> research project code to the problem-causing parts:
>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>
>>>>>>> I have been working on this issue but I don't have any ideas on
>>>>>>> what
>>>>>>> might
>>>>>>> be the problem. Perhaps someone more knowledgeable about the
>>>>>>> interior
>>> of
>>>>>>> the Python API could kindly help?
>>>>>>>
>>>>>>> Thank you very much.
>>>>>>>
>>>>>>> Geoffrey Mon
>>>>>>>
>>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Thank you very much. Disabling chaining with the Python API allows my
actual script to run properly. The division by zero must be an issue with
the job that I posted on gist.

Does that mean that the issue must be in the chaining part of the API?
Chaining from the way I understand it is an important optimization that
would be important for the performance comparison I wish to make in my
project.

Cheers,
Geoffrey

On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> wrote:

> A temporary work around appears to be disabling chaining, which you can
> do by commenting out L215 "self._find_chains()" in Environment.py.
> Note that you then run into a division by zero error, but i can't tell
> whether that is a problem of the job or not.
>
> On 13.10.2016 13:41, Chesnay Schepler wrote:
> > Hey Geoffrey,
> >
> > I was able to reproduce the error and will look into it in more detail
> > tomorrow.
> >
> > Regards,
> > Chesnay
> >
> > On 12.10.2016 23:09, Geoffrey Mon wrote:
> >> Hello,
> >>
> >> Has anyone had a chance to look into this? I am currently working on the
> >> problem but I have minimal understanding of how the internal Flink
> >> Python
> >> API works; any expertise would be greatly appreciated.
> >>
> >> Thank you very much!
> >>
> >> Geoffrey
> >>
> >> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote:
> >>
> >>> Hi Chesnay,
> >>>
> >>> Heh, I have discovered that if I do not restart Flink after running my
> >>> original problematic script, then similar issues will manifest
> >>> themselves
> >>> in other otherwise working scripts. I haven't been able to completely
> >>> narrow down the problem, but I promise this new script will have a
> >>> ClassCastException that is completely reproducible. :)
> >>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>
> >>> Thanks,
> >>> Geoffrey
> >>>
> >>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]>
> >>> wrote:
> >>>
> >>> Hello Geoffrey,
> >>>
> >>> this one works for me as well :D
> >>>
> >>> Regards,
> >>> Chesnay
> >>>
> >>> On 28.09.2016 05:38, Geoffrey Mon wrote:
> >>>> Hello Chesnay,
> >>>>
> >>>> Thank you for your help. After receiving your message I recompiled my
> >>>> version of Flink completely, and both the NullPointerException
> >>>> listed in
> >>>> the TODO and the ClassCastException with the join operation went away.
> >>>> Previously, I had been only recompiling the modules of Flink that had
> >>> been
> >>>> changed to save time using "mvn clean install -pl :module" and
> >>>> apparently
> >>>> that may have been causing some of my issues.
> >>>>
> >>>> Now, the problem is more clear: when a specific group reduce
> >>>> function in
> >>> my
> >>>> research project plan file is used within an iteration, I get a
> >>>> ClassCastException exception:
> >>>> Caused by: java.lang.ClassCastException:
> >>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
> >>>> at
> >>>>
> >>>
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
> >>>
> >>>> at
> >>>> org.apache.flink.runtime.iterative.io
> >>>
> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> >>>
> >>>> at
> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
> >>>
> >>>> at
> >>>>
> >>>
> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
> >>>
> >>>> at
> >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
> >>>> at java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>> I'm not sure why this is causing an exception, and I would greatly
> >>>> appreciate any assistance. I've revised the barebones error-causing
> >>>> plan
> >>>> file to focus on this new error source:
> >>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>> The group reduce function in question seems to work just fine
> >>>> outside of
> >>>> iterations. I have organized the commits and pushed to a new branch to
> >>> make
> >>>> it easier to test and hopefully review soon:
> >>>> https://github.com/GEOFBOT/flink/tree/new-iterations
> >>>>
> >>>> Cheers,
> >>>> Geoffrey
> >>>>
> >>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
> >>> wrote:
> >>>>> Hello Geoffrey,
> >>>>>
> >>>>> i could not reproduce this issue with the commits and plan you
> >>>>> provided.
> >>>>>
> >>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
> >>>>> reverted back to the specified commits) and built Flink from scratch.
> >>>>>
> >>>>> Could you double check that the code you provided produces the error?
> >>>>> Also, which OS/python version are you using?
> >>>>>
> >>>>> Regards,
> >>>>> Chesnay
> >>>>>
> >>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
> >>>>>> Hello,
> >>>>>>
> >>>>>> I'll try to take a look this week.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Chesnay
> >>>>>>
> >>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
> >>>>>>> Hello all,
> >>>>>>>
> >>>>>>> I have recently been working on adding bulk iterations to the
> >>>>>>> Python
> >>>>>>> API of
> >>>>>>> Flink in order to facilitate a research project I am working on.
> >>>>>>> The
> >>>>>>> current changes can be seen in this GitHub diff:
> >>>>>>>
> >>>
> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
> >>>
> >>>>>>> This implementation seems to work for, at least, simple examples,
> >>>>>>> such as
> >>>>>>> incrementing numbers in a data set. However, with the
> >>>>>>> transformations
> >>>>>>> required for my project, I get an exception
> >>>>>>> "java.lang.ClassCastException:
> >>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
> >>>>>>> from the
> >>>>>>> deserializers called by
> >>>>>>>
> >>>
> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
> >>>
> >>>>>>> I've created the following simplified Python plan by stripping
> >>>>>>> down my
> >>>>>>> research project code to the problem-causing parts:
> >>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
> >>>>>>>
> >>>>>>> I have been working on this issue but I don't have any ideas on
> >>>>>>> what
> >>>>>>> might
> >>>>>>> be the problem. Perhaps someone more knowledgeable about the
> >>>>>>> interior
> >>> of
> >>>>>>> the Python API could kindly help?
> >>>>>>>
> >>>>>>> Thank you very much.
> >>>>>>>
> >>>>>>> Geoffrey Mon
> >>>>>>>
> >>>
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
The chaining code is definitely related, I also have a pretty clear idea
how to fix it.

The odd thing is that the Java API doesn't catch this type mismatch; the
date types are
known when the plan is generated. This kind of error shouldn't even happen.

On 13.10.2016 21:15, Geoffrey Mon wrote:

> Thank you very much. Disabling chaining with the Python API allows my
> actual script to run properly. The division by zero must be an issue with
> the job that I posted on gist.
>
> Does that mean that the issue must be in the chaining part of the API?
> Chaining from the way I understand it is an important optimization that
> would be important for the performance comparison I wish to make in my
> project.
>
> Cheers,
> Geoffrey
>
> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]> wrote:
>
>> A temporary work around appears to be disabling chaining, which you can
>> do by commenting out L215 "self._find_chains()" in Environment.py.
>> Note that you then run into a division by zero error, but i can't tell
>> whether that is a problem of the job or not.
>>
>> On 13.10.2016 13:41, Chesnay Schepler wrote:
>>> Hey Geoffrey,
>>>
>>> I was able to reproduce the error and will look into it in more detail
>>> tomorrow.
>>>
>>> Regards,
>>> Chesnay
>>>
>>> On 12.10.2016 23:09, Geoffrey Mon wrote:
>>>> Hello,
>>>>
>>>> Has anyone had a chance to look into this? I am currently working on the
>>>> problem but I have minimal understanding of how the internal Flink
>>>> Python
>>>> API works; any expertise would be greatly appreciated.
>>>>
>>>> Thank you very much!
>>>>
>>>> Geoffrey
>>>>
>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]> wrote:
>>>>
>>>>> Hi Chesnay,
>>>>>
>>>>> Heh, I have discovered that if I do not restart Flink after running my
>>>>> original problematic script, then similar issues will manifest
>>>>> themselves
>>>>> in other otherwise working scripts. I haven't been able to completely
>>>>> narrow down the problem, but I promise this new script will have a
>>>>> ClassCastException that is completely reproducible. :)
>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>
>>>>> Thanks,
>>>>> Geoffrey
>>>>>
>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hello Geoffrey,
>>>>>
>>>>> this one works for me as well :D
>>>>>
>>>>> Regards,
>>>>> Chesnay
>>>>>
>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>>>> Hello Chesnay,
>>>>>>
>>>>>> Thank you for your help. After receiving your message I recompiled my
>>>>>> version of Flink completely, and both the NullPointerException
>>>>>> listed in
>>>>>> the TODO and the ClassCastException with the join operation went away.
>>>>>> Previously, I had been only recompiling the modules of Flink that had
>>>>> been
>>>>>> changed to save time using "mvn clean install -pl :module" and
>>>>>> apparently
>>>>>> that may have been causing some of my issues.
>>>>>>
>>>>>> Now, the problem is more clear: when a specific group reduce
>>>>>> function in
>>>>> my
>>>>>> research project plan file is used within an iteration, I get a
>>>>>> ClassCastException exception:
>>>>>> Caused by: java.lang.ClassCastException:
>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>>>> at
>>>>>>
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>>>> at
>>>>>> org.apache.flink.runtime.iterative.io
>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>>>> at
>>>>>>
>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>>>> at
>>>>>>
>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> I'm not sure why this is causing an exception, and I would greatly
>>>>>> appreciate any assistance. I've revised the barebones error-causing
>>>>>> plan
>>>>>> file to focus on this new error source:
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>> The group reduce function in question seems to work just fine
>>>>>> outside of
>>>>>> iterations. I have organized the commits and pushed to a new branch to
>>>>> make
>>>>>> it easier to test and hopefully review soon:
>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>>>
>>>>>> Cheers,
>>>>>> Geoffrey
>>>>>>
>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler <[hidden email]>
>>>>> wrote:
>>>>>>> Hello Geoffrey,
>>>>>>>
>>>>>>> i could not reproduce this issue with the commits and plan you
>>>>>>> provided.
>>>>>>>
>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>>>>> reverted back to the specified commits) and built Flink from scratch.
>>>>>>>
>>>>>>> Could you double check that the code you provided produces the error?
>>>>>>> Also, which OS/python version are you using?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Chesnay
>>>>>>>
>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'll try to take a look this week.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Chesnay
>>>>>>>>
>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I have recently been working on adding bulk iterations to the
>>>>>>>>> Python
>>>>>>>>> API of
>>>>>>>>> Flink in order to facilitate a research project I am working on.
>>>>>>>>> The
>>>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>>>
>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0
>>>>>>>>> This implementation seems to work for, at least, simple examples,
>>>>>>>>> such as
>>>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>>>> transformations
>>>>>>>>> required for my project, I get an exception
>>>>>>>>> "java.lang.ClassCastException:
>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple" thrown
>>>>>>>>> from the
>>>>>>>>> deserializers called by
>>>>>>>>>
>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>>>>>>> I've created the following simplified Python plan by stripping
>>>>>>>>> down my
>>>>>>>>> research project code to the problem-causing parts:
>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>>>
>>>>>>>>> I have been working on this issue but I don't have any ideas on
>>>>>>>>> what
>>>>>>>>> might
>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the
>>>>>>>>> interior
>>>>> of
>>>>>>>>> the Python API could kindly help?
>>>>>>>>>
>>>>>>>>> Thank you very much.
>>>>>>>>>
>>>>>>>>> Geoffrey Mon
>>>>>>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Chesnay Schepler-3
In this branch: https://github.com/zentol/flink/tree/new-iterations you
can find a more fine-grained fix for chaining with
iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695

On 13.10.2016 23:11, Chesnay Schepler wrote:

> The chaining code is definitely related, I also have a pretty clear
> idea how to fix it.
>
> The odd thing is that the Java API doesn't catch this type mismatch;
> the date types are
> known when the plan is generated. This kind of error shouldn't even
> happen.
>
> On 13.10.2016 21:15, Geoffrey Mon wrote:
>> Thank you very much. Disabling chaining with the Python API allows my
>> actual script to run properly. The division by zero must be an issue
>> with
>> the job that I posted on gist.
>>
>> Does that mean that the issue must be in the chaining part of the API?
>> Chaining from the way I understand it is an important optimization that
>> would be important for the performance comparison I wish to make in my
>> project.
>>
>> Cheers,
>> Geoffrey
>>
>> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>> A temporary work around appears to be disabling chaining, which you can
>>> do by commenting out L215 "self._find_chains()" in Environment.py.
>>> Note that you then run into a division by zero error, but i can't tell
>>> whether that is a problem of the job or not.
>>>
>>> On 13.10.2016 13:41, Chesnay Schepler wrote:
>>>> Hey Geoffrey,
>>>>
>>>> I was able to reproduce the error and will look into it in more detail
>>>> tomorrow.
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>> On 12.10.2016 23:09, Geoffrey Mon wrote:
>>>>> Hello,
>>>>>
>>>>> Has anyone had a chance to look into this? I am currently working
>>>>> on the
>>>>> problem but I have minimal understanding of how the internal Flink
>>>>> Python
>>>>> API works; any expertise would be greatly appreciated.
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>> Geoffrey
>>>>>
>>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Chesnay,
>>>>>>
>>>>>> Heh, I have discovered that if I do not restart Flink after
>>>>>> running my
>>>>>> original problematic script, then similar issues will manifest
>>>>>> themselves
>>>>>> in other otherwise working scripts. I haven't been able to
>>>>>> completely
>>>>>> narrow down the problem, but I promise this new script will have a
>>>>>> ClassCastException that is completely reproducible. :)
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>
>>>>>> Thanks,
>>>>>> Geoffrey
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler
>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> Hello Geoffrey,
>>>>>>
>>>>>> this one works for me as well :D
>>>>>>
>>>>>> Regards,
>>>>>> Chesnay
>>>>>>
>>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>>>>> Hello Chesnay,
>>>>>>>
>>>>>>> Thank you for your help. After receiving your message I
>>>>>>> recompiled my
>>>>>>> version of Flink completely, and both the NullPointerException
>>>>>>> listed in
>>>>>>> the TODO and the ClassCastException with the join operation went
>>>>>>> away.
>>>>>>> Previously, I had been only recompiling the modules of Flink
>>>>>>> that had
>>>>>> been
>>>>>>> changed to save time using "mvn clean install -pl :module" and
>>>>>>> apparently
>>>>>>> that may have been causing some of my issues.
>>>>>>>
>>>>>>> Now, the problem is more clear: when a specific group reduce
>>>>>>> function in
>>>>>> my
>>>>>>> research project plan file is used within an iteration, I get a
>>>>>>> ClassCastException exception:
>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>>>>> at
>>>>>>>
>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.iterative.io
>>> .WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>>>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>
>>>>>>> at
>>>>>>>
>>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>
>>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>>>>>
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> I'm not sure why this is causing an exception, and I would greatly
>>>>>>> appreciate any assistance. I've revised the barebones error-causing
>>>>>>> plan
>>>>>>> file to focus on this new error source:
>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>> The group reduce function in question seems to work just fine
>>>>>>> outside of
>>>>>>> iterations. I have organized the commits and pushed to a new
>>>>>>> branch to
>>>>>> make
>>>>>>> it easier to test and hopefully review soon:
>>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Geoffrey
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler
>>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>>> Hello Geoffrey,
>>>>>>>>
>>>>>>>> i could not reproduce this issue with the commits and plan you
>>>>>>>> provided.
>>>>>>>>
>>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>>>>>> reverted back to the specified commits) and built Flink from
>>>>>>>> scratch.
>>>>>>>>
>>>>>>>> Could you double check that the code you provided produces the
>>>>>>>> error?
>>>>>>>> Also, which OS/python version are you using?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Chesnay
>>>>>>>>
>>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'll try to take a look this week.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Chesnay
>>>>>>>>>
>>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have recently been working on adding bulk iterations to the
>>>>>>>>>> Python
>>>>>>>>>> API of
>>>>>>>>>> Flink in order to facilitate a research project I am working on.
>>>>>>>>>> The
>>>>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>>>>
>>> https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0 
>>>
>>>>>>>>>> This implementation seems to work for, at least, simple
>>>>>>>>>> examples,
>>>>>>>>>> such as
>>>>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>>>>> transformations
>>>>>>>>>> required for my project, I get an exception
>>>>>>>>>> "java.lang.ClassCastException:
>>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple"
>>>>>>>>>> thrown
>>>>>>>>>> from the
>>>>>>>>>> deserializers called by
>>>>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>
>>>>>>>>>> I've created the following simplified Python plan by stripping
>>>>>>>>>> down my
>>>>>>>>>> research project code to the problem-causing parts:
>>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>>>>
>>>>>>>>>> I have been working on this issue but I don't have any ideas on
>>>>>>>>>> what
>>>>>>>>>> might
>>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the
>>>>>>>>>> interior
>>>>>> of
>>>>>>>>>> the Python API could kindly help?
>>>>>>>>>>
>>>>>>>>>> Thank you very much.
>>>>>>>>>>
>>>>>>>>>> Geoffrey Mon
>>>>>>>>>>
>>>>
>>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Exception from in-progress implementation of Python API bulk iterations

Geoffrey Mon
Your solution works well, many thanks. This solves the exception that I
described previously.

However, in a different part of the script I come across another problem
about reusing data sets. For example, given the script at
https://gist.github.com/GEOFBOT/d670f567f8c886572c8715a6058f8b34, I get an
exception about a data type not having a corresponding deserializer for a
broadcasted variable. However, no data sets in my plan are broadcasted. I
noticed that in the operations diagram generated by Flink, the data set 'S'
is created two times, once each time it is used in the plan, even though I
intended to reuse the data set. One of the creations involves a broadcast.
Presumably this is related to an optimization built into Flink?

I appreciate any insights or help you may have about this problem.

Thanks,
Geoffrey

On Fri, Oct 14, 2016 at 7:24 AM Chesnay Schepler <[hidden email]> wrote:

In this branch: https://github.com/zentol/flink/tree/new-iterations you
can find a more fine-grained fix for chaining with
iterations. relevant commit: ac2305d9589a5c6ab9e94d04c870fba52716d695

On 13.10.2016 23:11, Chesnay Schepler wrote:

> The chaining code is definitely related, I also have a pretty clear
> idea how to fix it.
>
> The odd thing is that the Java API doesn't catch this type mismatch;
> the date types are
> known when the plan is generated. This kind of error shouldn't even
> happen.
>
> On 13.10.2016 21:15, Geoffrey Mon wrote:
>> Thank you very much. Disabling chaining with the Python API allows my
>> actual script to run properly. The division by zero must be an issue
>> with
>> the job that I posted on gist.
>>
>> Does that mean that the issue must be in the chaining part of the API?
>> Chaining from the way I understand it is an important optimization that
>> would be important for the performance comparison I wish to make in my
>> project.
>>
>> Cheers,
>> Geoffrey
>>
>> On Thu, Oct 13, 2016 at 9:11 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>> A temporary work around appears to be disabling chaining, which you can
>>> do by commenting out L215 "self._find_chains()" in Environment.py.
>>> Note that you then run into a division by zero error, but i can't tell
>>> whether that is a problem of the job or not.
>>>
>>> On 13.10.2016 13:41, Chesnay Schepler wrote:
>>>> Hey Geoffrey,
>>>>
>>>> I was able to reproduce the error and will look into it in more detail
>>>> tomorrow.
>>>>
>>>> Regards,
>>>> Chesnay
>>>>
>>>> On 12.10.2016 23:09, Geoffrey Mon wrote:
>>>>> Hello,
>>>>>
>>>>> Has anyone had a chance to look into this? I am currently working
>>>>> on the
>>>>> problem but I have minimal understanding of how the internal Flink
>>>>> Python
>>>>> API works; any expertise would be greatly appreciated.
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>> Geoffrey
>>>>>
>>>>> On Sat, Oct 8, 2016 at 1:27 PM Geoffrey Mon <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Chesnay,
>>>>>>
>>>>>> Heh, I have discovered that if I do not restart Flink after
>>>>>> running my
>>>>>> original problematic script, then similar issues will manifest
>>>>>> themselves
>>>>>> in other otherwise working scripts. I haven't been able to
>>>>>> completely
>>>>>> narrow down the problem, but I promise this new script will have a
>>>>>> ClassCastException that is completely reproducible. :)
>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>
>>>>>> Thanks,
>>>>>> Geoffrey
>>>>>>
>>>>>> On Wed, Sep 28, 2016 at 9:16 AM Chesnay Schepler
>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> Hello Geoffrey,
>>>>>>
>>>>>> this one works for me as well :D
>>>>>>
>>>>>> Regards,
>>>>>> Chesnay
>>>>>>
>>>>>> On 28.09.2016 05:38, Geoffrey Mon wrote:
>>>>>>> Hello Chesnay,
>>>>>>>
>>>>>>> Thank you for your help. After receiving your message I
>>>>>>> recompiled my
>>>>>>> version of Flink completely, and both the NullPointerException
>>>>>>> listed in
>>>>>>> the TODO and the ClassCastException with the join operation went
>>>>>>> away.
>>>>>>> Previously, I had been only recompiling the modules of Flink
>>>>>>> that had
>>>>>> been
>>>>>>> changed to save time using "mvn clean install -pl :module" and
>>>>>>> apparently
>>>>>>> that may have been causing some of my issues.
>>>>>>>
>>>>>>> Now, the problem is more clear: when a specific group reduce
>>>>>>> function in
>>>>>> my
>>>>>>> research project plan file is used within an iteration, I get a
>>>>>>> ClassCastException exception:
>>>>>>> Caused by: java.lang.ClassCastException:
>>>>>>> org.apache.flink.api.java.tuple.Tuple2 cannot be cast to [B
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.serialize(BytePrimitiveArraySerializer.java:31)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.iterative.io
>>>
.WorksetUpdateOutputCollector.collect(WorksetUpdateOutputCollector.java:58)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:96)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>>>
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>>>>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>>>
>>>>>>> at
>>>>>>>
>>>
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>>>
>>>>>>> at
>>>>>>
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)

>>>>>>
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:591)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> I'm not sure why this is causing an exception, and I would greatly
>>>>>>> appreciate any assistance. I've revised the barebones error-causing
>>>>>>> plan
>>>>>>> file to focus on this new error source:
>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>> The group reduce function in question seems to work just fine
>>>>>>> outside of
>>>>>>> iterations. I have organized the commits and pushed to a new
>>>>>>> branch to
>>>>>> make
>>>>>>> it easier to test and hopefully review soon:
>>>>>>> https://github.com/GEOFBOT/flink/tree/new-iterations
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Geoffrey
>>>>>>>
>>>>>>> On Mon, Sep 26, 2016 at 6:32 AM Chesnay Schepler
>>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>>> Hello Geoffrey,
>>>>>>>>
>>>>>>>> i could not reproduce this issue with the commits and plan you
>>>>>>>> provided.
>>>>>>>>
>>>>>>>> I tried out both the FLINK-4098 and bulk-iterations branches (and
>>>>>>>> reverted back to the specified commits) and built Flink from
>>>>>>>> scratch.
>>>>>>>>
>>>>>>>> Could you double check that the code you provided produces the
>>>>>>>> error?
>>>>>>>> Also, which OS/python version are you using?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Chesnay
>>>>>>>>
>>>>>>>> On 20.09.2016 11:13, Chesnay Schepler wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'll try to take a look this week.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Chesnay
>>>>>>>>>
>>>>>>>>> On 20.09.2016 02:38, Geoffrey Mon wrote:
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have recently been working on adding bulk iterations to the
>>>>>>>>>> Python
>>>>>>>>>> API of
>>>>>>>>>> Flink in order to facilitate a research project I am working on.
>>>>>>>>>> The
>>>>>>>>>> current changes can be seen in this GitHub diff:
>>>>>>>>>>
>>>
https://github.com/apache/flink/compare/master...GEOFBOT:e8c9833b43675af66ce897da9880c4f8cd16aad0

>>>
>>>>>>>>>> This implementation seems to work for, at least, simple
>>>>>>>>>> examples,
>>>>>>>>>> such as
>>>>>>>>>> incrementing numbers in a data set. However, with the
>>>>>>>>>> transformations
>>>>>>>>>> required for my project, I get an exception
>>>>>>>>>> "java.lang.ClassCastException:
>>>>>>>>>> [B cannot be cast to org.apache.flink.api.java.tuple.Tuple"
>>>>>>>>>> thrown
>>>>>>>>>> from the
>>>>>>>>>> deserializers called by
>>>>>>>>>>
>>> org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer.
>>>
>>>>>>>>>> I've created the following simplified Python plan by stripping
>>>>>>>>>> down my
>>>>>>>>>> research project code to the problem-causing parts:
>>>>>>>>>> https://gist.github.com/GEOFBOT/abb7f81030aab160e6908093ebaa3b4a
>>>>>>>>>>
>>>>>>>>>> I have been working on this issue but I don't have any ideas on
>>>>>>>>>> what
>>>>>>>>>> might
>>>>>>>>>> be the problem. Perhaps someone more knowledgeable about the
>>>>>>>>>> interior
>>>>>> of
>>>>>>>>>> the Python API could kindly help?
>>>>>>>>>>
>>>>>>>>>> Thank you very much.
>>>>>>>>>>
>>>>>>>>>> Geoffrey Mon
>>>>>>>>>>
>>>>
>>>
>
>