Hi,
I am running into a type erasure problem which only occurs when I execute the code using a Flink cluster (1.1.2). I created a Gist [1] which reproduces the problem. I also added a unit test to show that it does not fail in local and collection mode. Maybe it is also interesting to mention that - in my actual code - I manually created a TypeInformation (the same which is automatically created on local execution) and gave it to the operators using .returns(..). However, this lead to the issue, that my field forwarding annotations failed with invalid reference exceptions (the same annotations that work locally). The issue came up after I generalized the core of one our algorithms. Before, when the types were non-generic, this ran without problems locally and on the cluster. Thanks in advance! Cheers, Martin [1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec The exception in the Gist case: The return type of function 'withPojo(Problem.java:58)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. org.apache.flink.api.java.DataSet.getType(DataSet.java:178) org.apache.flink.api.java.DataSet.collect(DataSet.java:407) org.apache.flink.api.java.DataSet.print(DataSet.java:1605) Problem.withPojo(Problem.java:60) Problem.main(Problem.java:38) |
Hi Martin,
thanks for reporting the problem and providing code to reproduce it. Would you mind to describe the problem with the forwarding annotations in more detail? I would be interested in the error message and how the semantic annotation is provided (@ForwardFields or withForwardedFields()). Thanks, Fabian 2016-10-19 8:52 GMT+02:00 Martin Junghanns <[hidden email]>: > Hi, > > I am running into a type erasure problem which only occurs when I execute > the code using a Flink cluster (1.1.2). I created a Gist [1] which > reproduces the problem. I also added a unit test to show that it does not > fail in local and collection mode. > > Maybe it is also interesting to mention that - in my actual code - I > manually created a TypeInformation (the same which is automatically created > on local execution) and gave it to the operators using .returns(..). > However, this lead to the issue, that my field forwarding annotations > failed with invalid reference exceptions (the same annotations that work > locally). > > The issue came up after I generalized the core of one our algorithms. > Before, when the types were non-generic, this ran without problems locally > and on the cluster. > > Thanks in advance! > > Cheers, Martin > > [1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec > > The exception in the Gist case: > > The return type of function 'withPojo(Problem.java:58)' could not be > determined automatically, due to type erasure. You can give type > information hints by using the returns(...) method on the result of the > transformation call, or by letting your function implement the > 'ResultTypeQueryable' interface. > org.apache.flink.api.java.DataSet.getType(DataSet.java:178) > org.apache.flink.api.java.DataSet.collect(DataSet.java:407) > org.apache.flink.api.java.DataSet.print(DataSet.java:1605) > Problem.withPojo(Problem.java:60) > Problem.main(Problem.java:38) > > |
Hi Fabian,
Thank you for the quick reply and for looking into it. Sorry, I was a bit too quick with the field reference accusation. Turns out, my TypeInformation was wrong, hence the invalid reference exception. However, the type erasure problem still holds. The actual code can be found here [1]. The code runs fine using the LocalExecutionEnvironment and it also runs on the cluster when using a non-Pojo type for T (e.g. java.lang.Long). However, for Pojo types, it fails on the cluster with a type erasure related exception. Hence, I manually created the TypeInformation for the Embedding class: public static <T> TypeInformation<Embedding<T>>getType(Class<T> clazz) { TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<T> arrayType = ObjectArrayTypeInfo.getInfoFor(type); return new TupleTypeInfo<>(arrayType, arrayType); } and for the EmbeddingWithTiePoint class: public static <T> TypeInformation<EmbeddingWithTiePoint<T>>getType(Class<T> clazz) { TypeInformation<T> type = TypeInformation.of(clazz); TypeInformation<Embedding<T>> embeddingType = Embedding.getType(clazz); return new TupleTypeInfo<>(type, embeddingType); } Note, that this produces the same TypeInformation as the automatic type extraction does in the local, working scenario. I provided the type info to the UDF which initially creates the EmbeddingWithTiePoint instances [1]: DataSet<EmbeddingWithTiePoint<K>> initialEmbeddings = vertices .filter(new ElementHasCandidate<>(traversalCode.getStep(0).getFrom())) .map(new BuildEmbeddingWithTiePoint<>(keyClass, traversalCode, vertexCount, edgeCount)) .returns(EmbeddingWithTiePoint.getType(keyClass)); However, Flink tells me that I now need to provide the same type information at all places where the output is of type EmbeddingWithTiePoint [2], [3]. If I do so, the program fails with a clast cast exception: Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint at org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions.UpdateEdgeMappings.join(UpdateEdgeMappings.java:50) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:149) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:222) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) I guess, the issue is not really the missing TypeInformation, but something that is done differently when using the cluster execution and Pojo types. Maybe related to the generic array creation via reflection? Hope this helps. Best, Martin [1] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L170 [2] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L215 [3] https://github.com/dbs-leipzig/gradoop/blob/master/gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/single/preserving/explorative/DistributedTraverser.java#L234 On 19.10.2016 09:33, Fabian Hueske wrote: > Hi Martin, > > thanks for reporting the problem and providing code to reproduce it. > > Would you mind to describe the problem with the forwarding annotations in > more detail? > I would be interested in the error message and how the semantic annotation > is provided (@ForwardFields or withForwardedFields()). > > Thanks, Fabian > > 2016-10-19 8:52 GMT+02:00 Martin Junghanns <[hidden email]>: > >> Hi, >> >> I am running into a type erasure problem which only occurs when I execute >> the code using a Flink cluster (1.1.2). I created a Gist [1] which >> reproduces the problem. I also added a unit test to show that it does not >> fail in local and collection mode. >> >> Maybe it is also interesting to mention that - in my actual code - I >> manually created a TypeInformation (the same which is automatically created >> on local execution) and gave it to the operators using .returns(..). >> However, this lead to the issue, that my field forwarding annotations >> failed with invalid reference exceptions (the same annotations that work >> locally). >> >> The issue came up after I generalized the core of one our algorithms. >> Before, when the types were non-generic, this ran without problems locally >> and on the cluster. >> >> Thanks in advance! >> >> Cheers, Martin >> >> [1] https://gist.github.com/s1ck/caf9f3f46e7a5afe6f6a73c479948fec >> >> The exception in the Gist case: >> >> The return type of function 'withPojo(Problem.java:58)' could not be >> determined automatically, due to type erasure. You can give type >> information hints by using the returns(...) method on the result of the >> transformation call, or by letting your function implement the >> 'ResultTypeQueryable' interface. >> org.apache.flink.api.java.DataSet.getType(DataSet.java:178) >> org.apache.flink.api.java.DataSet.collect(DataSet.java:407) >> org.apache.flink.api.java.DataSet.print(DataSet.java:1605) >> Problem.withPojo(Problem.java:60) >> Problem.main(Problem.java:38) >> >> |
Free forum by Nabble | Edit this page |