Type erasure problem solely on cluster execution

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Type erasure problem solely on cluster execution

Martin Junghanns
Hi,

I am running into a type erasure problem which only occurs when I
execute the code using a Flink cluster (1.1.2). I created a Gist [1]
which reproduces the problem. I also added a unit test to show that it
does not fail in local and collection mode.

Maybe it is also interesting to mention that - in my actual code - I
manually created a TypeInformation (the same which is automatically
created on local execution) and gave it to the operators using
.returns(..). However, this lead to the issue, that my field forwarding
annotations failed with invalid reference exceptions (the same
annotations that work locally).

The issue came up after I generalized the core of one our algorithms.
Before, when the types were non-generic, this ran without problems
locally and on the cluster.

Thanks in advance!

Cheers, Martin

[1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec

The exception in the Gist case:

The return type of function 'withPojo(Problem.java:58)' could not be
determined automatically, due to type erasure. You can give type
information hints by using the returns(...) method on the result of the
transformation call, or by letting your function implement the
'ResultTypeQueryable' interface.
     org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
     org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
     org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
     Problem.withPojo(Problem.java:60)
     Problem.main(Problem.java:38)

Reply | Threaded
Open this post in threaded view
|

Re: Type erasure problem solely on cluster execution

Fabian Hueske-2
Hi Martin,

thanks for reporting the problem and providing code to reproduce it.

Would you mind to describe the problem with the forwarding annotations in
more detail?
I would be interested in the error message and how the semantic annotation
is provided (@ForwardFields or withForwardedFields()).

Thanks, Fabian

2016-10-19 8:52 GMT+02:00 Martin Junghanns <[hidden email]>:

> Hi,
>
> I am running into a type erasure problem which only occurs when I execute
> the code using a Flink cluster (1.1.2). I created a Gist [1] which
> reproduces the problem. I also added a unit test to show that it does not
> fail in local and collection mode.
>
> Maybe it is also interesting to mention that - in my actual code - I
> manually created a TypeInformation (the same which is automatically created
> on local execution) and gave it to the operators using .returns(..).
> However, this lead to the issue, that my field forwarding annotations
> failed with invalid reference exceptions (the same annotations that work
> locally).
>
> The issue came up after I generalized the core of one our algorithms.
> Before, when the types were non-generic, this ran without problems locally
> and on the cluster.
>
> Thanks in advance!
>
> Cheers, Martin
>
> [1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec
>
> The exception in the Gist case:
>
> The return type of function 'withPojo(Problem.java:58)' could not be
> determined automatically, due to type erasure. You can give type
> information hints by using the returns(...) method on the result of the
> transformation call, or by letting your function implement the
> 'ResultTypeQueryable' interface.
>     org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
>     org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
>     org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>     Problem.withPojo(Problem.java:60)
>     Problem.main(Problem.java:38)
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Type erasure problem solely on cluster execution

Martin Junghanns
Hi Fabian,

Thank you for the quick reply and for looking into it.

Sorry, I was a bit too quick with the field reference accusation. Turns
out, my TypeInformation was wrong, hence the invalid reference exception.

However, the type erasure problem still holds.

The actual code can be found here [1]. The code runs fine using the
LocalExecutionEnvironment and it also runs on the cluster when using a
non-Pojo type for T (e.g. java.lang.Long). However, for Pojo types, it
fails on the cluster with a type erasure related exception. Hence, I
manually created the TypeInformation for the Embedding class:

public static <T> TypeInformation<Embedding<T>>getType(Class<T> clazz) {
   TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<T> arrayType = ObjectArrayTypeInfo.getInfoFor(type); return new TupleTypeInfo<>(arrayType, arrayType); }

and for the EmbeddingWithTiePoint class:

public static <T> TypeInformation<EmbeddingWithTiePoint<T>>getType(Class<T> clazz) {
   TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<Embedding<T>> embeddingType = Embedding.getType(clazz); return new TupleTypeInfo<>(type, embeddingType); }

Note, that this produces the same TypeInformation as the automatic type
extraction does in the local, working scenario.

I provided the type info to the UDF which initially creates the
EmbeddingWithTiePoint instances [1]:

DataSet<EmbeddingWithTiePoint<K>> initialEmbeddings = vertices
   .filter(new ElementHasCandidate<>(traversalCode.getStep(0).getFrom()))
   .map(new BuildEmbeddingWithTiePoint<>(keyClass, traversalCode, vertexCount, edgeCount))
   .returns(EmbeddingWithTiePoint.getType(keyClass));

However, Flink tells me that I now need to provide the same type
information at all places where the output is of type
EmbeddingWithTiePoint [2], [3]. If I do so, the program fails with a
clast cast exception:

Caused by: java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint
     at
org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.UpdateEdgeMappings.join(UpdateEdgeMappings.java:50)
     at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:149)
     at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:222)
     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
     at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
     at
org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
     at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
     at java.lang.Thread.run(Thread.java:745)

I guess, the issue is not really the missing TypeInformation, but
something that is done differently when using the cluster execution and
Pojo types. Maybe related to the generic array creation via reflection?
Hope this helps.

Best, Martin

[1]
https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L170

[2]
https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L215

[3]
https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L234

On 19.10.2016 09:33, Fabian Hueske wrote:

> Hi Martin,
>
> thanks for reporting the problem and providing code to reproduce it.
>
> Would you mind to describe the problem with the forwarding annotations in
> more detail?
> I would be interested in the error message and how the semantic annotation
> is provided (@ForwardFields or withForwardedFields()).
>
> Thanks, Fabian
>
> 2016-10-19 8:52 GMT+02:00 Martin Junghanns <[hidden email]>:
>
>> Hi,
>>
>> I am running into a type erasure problem which only occurs when I execute
>> the code using a Flink cluster (1.1.2). I created a Gist [1] which
>> reproduces the problem. I also added a unit test to show that it does not
>> fail in local and collection mode.
>>
>> Maybe it is also interesting to mention that - in my actual code - I
>> manually created a TypeInformation (the same which is automatically created
>> on local execution) and gave it to the operators using .returns(..).
>> However, this lead to the issue, that my field forwarding annotations
>> failed with invalid reference exceptions (the same annotations that work
>> locally).
>>
>> The issue came up after I generalized the core of one our algorithms.
>> Before, when the types were non-generic, this ran without problems locally
>> and on the cluster.
>>
>> Thanks in advance!
>>
>> Cheers, Martin
>>
>> [1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec
>>
>> The exception in the Gist case:
>>
>> The return type of function 'withPojo(Problem.java:58)' could not be
>> determined automatically, due to type erasure. You can give type
>> information hints by using the returns(...) method on the result of the
>> transformation call, or by letting your function implement the
>> 'ResultTypeQueryable' interface.
>>      org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
>>      org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
>>      org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>>      Problem.withPojo(Problem.java:60)
>>      Problem.main(Problem.java:38)
>>
>>