Hi squirrels,
I have 2 problems with the new Either type and I could use your help to understand them. 1. I have a piece of code that looks like this: TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo = ... DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map(...).returns(workSetTypeInfo); This gives me the following exception: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.graph.spargelnew.MessagePassingIteration$InitializeWorkSet@75ba8574 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.api.java.DataSet.clean(DataSet.java:184) at org.apache.flink.api.java.DataSet.map(DataSet.java:214) at org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:160) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190) at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650) at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53) Caused by: java.io.NotSerializableException: org.apache.flink.api.java.typeutils.Either$Left at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) Making Either implement java.io.Serializable solves this, but I am wondering why this is needed. Since I'm registering the typeinfo with returns(), shouldn't the EitherTypeSerializer be registered too? Also, this seem to be the only operation where I get this error, even though I'm using the Either type in other places as well. 2. The second problem appeared after rebasing to the current master, containing a fix for FLINK-3046 (Integrate the Either Java type with the TypeExtractor). Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'Message' in 'class org.apache.flink.graph.spargelnew.MessagePassingIteration$AppendVertexState' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:706) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:458) at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:713) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:425) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) at org.apache.flink.api.java.typeutils.TypeExtractor.getBinaryOperatorReturnType(TypeExtractor.java:320) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:176) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:170) at org.apache.flink.api.java.operators.JoinOperator$DefaultJoin.with(JoinOperator.java:562) at org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:171) at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190) at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650) at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53) The code giving this exception is the following: DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs = iteration.getSolutionSet().join(iteration.getWorkset()) .where(0).equalTo(0) .with(new AppendVertexState<K, VV, Message>()) . returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>>(vertexType, nullableMsgTypeInfo)); Do I need to register the Either typeinfo differently now that it's integrated with the TypeExtractor or is this a bug? If you want to see the complete code, I've pushed it here: https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/MessagePassingIteration.java#L168 . Thanks! -Vasia. |
Hi Vasia,
regarding your TypeExtractor problem. The TypeExtractor works correctly. The with() function of the JoinOperator calls the wrong TypeExtractor method that does not allow missing type info. This is a bug. Can open an issue for that? Regards, Timo On 28.11.2015 20:18, Vasiliki Kalavri wrote: > Hi squirrels, > > I have 2 problems with the new Either type and I could use your help to > understand them. > > 1. I have a piece of code that looks like this: > > TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo = ... > DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = > initialVertices.map(...).returns(workSetTypeInfo); > > This gives me the following exception: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Object > org.apache.flink.graph.spargelnew.MessagePassingIteration$InitializeWorkSet@75ba8574 > not serializable > > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) > at org.apache.flink.api.java.DataSet.clean(DataSet.java:184) > at org.apache.flink.api.java.DataSet.map(DataSet.java:214) > at > org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:160) > at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190) > at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650) > at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53) > > Caused by: java.io.NotSerializableException: > org.apache.flink.api.java.typeutils.Either$Left > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) > > Making Either implement > java.io.Serializable > solves this, but I am wondering why this is needed. Since I'm registering > the typeinfo with returns(), shouldn't the EitherTypeSerializer be > registered too? Also, this seem to be the only operation where I get this > error, even though I'm using the Either type in other places as well. > > > 2. The second problem appeared after rebasing to the current master, > containing a fix for FLINK-3046 (Integrate the Either Java type with the > TypeExtractor). > > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'Message' in 'class > org.apache.flink.graph.spargelnew.MessagePassingIteration$AppendVertexState' > could not be determined. This is most likely a type erasure problem. The > type extraction currently supports types with generic variables only in > cases where all variables in the return type can be deduced from the input > type(s). > > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:706) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:458) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:713) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:425) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getBinaryOperatorReturnType(TypeExtractor.java:320) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:176) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:170) > at > org.apache.flink.api.java.operators.JoinOperator$DefaultJoin.with(JoinOperator.java:562) > at > org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:171) > at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190) > at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650) > at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53) > > > The code giving this exception is the following: > > DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs > = > > > iteration.getSolutionSet().join(iteration.getWorkset()) > > .where(0).equalTo(0) > > .with(new AppendVertexState<K, VV, Message>()) > > . > > > returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue, > Message>>>(vertexType, nullableMsgTypeInfo)); > > Do I need to register the Either typeinfo differently now that it's > integrated with the TypeExtractor or is this a bug? > > If you want to see the complete code, I've pushed it here: > https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/MessagePassingIteration.java#L168 > . > > Thanks! > -Vasia. > |
In reply to this post by Vasiliki Kalavri
It seems there is an Either.Left stored somewhere in the Object. Could that be?
> On 28 Nov 2015, at 20:18, Vasiliki Kalavri <[hidden email]> wrote: > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) |
I checked the code and the MapFunction InitializeWorkSet contains indeed
two instances of type Either. Since we use Java serialization to ship the operators to the cluster, the MapFunction has to be serializable. Either you make the Either type serializable or you create these instances in the open method of the operator. Cheers, Till On Mon, Nov 30, 2015 at 11:03 AM, Aljoscha Krettek <[hidden email]> wrote: > It seems there is an Either.Left stored somewhere in the Object. Could > that be? > > On 28 Nov 2015, at 20:18, Vasiliki Kalavri <[hidden email]> > wrote: > > > > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) > > |
Hi,
thanks a lot for your answers :) @Timo: sure, I can open an issue, but I'm not sure I understand the problem to describe it properly. The with() method calls the getFlatJoinReturnTypes() with the allowMissing parameter set to false. Is this what you're referring to? What does this parameter do and when should it be true/false? Thanks! @Till: thanks for the explanation Till. I didn't realize Java serialization was involved there! Cheers, -V. On 30 November 2015 at 11:09, Till Rohrmann <[hidden email]> wrote: > I checked the code and the MapFunction InitializeWorkSet contains indeed > two instances of type Either. Since we use Java serialization to ship the > operators to the cluster, the MapFunction has to be serializable. Either > you make the Either type serializable or you create these instances in the > open method of the operator. > > Cheers, > Till > > > On Mon, Nov 30, 2015 at 11:03 AM, Aljoscha Krettek <[hidden email]> > wrote: > > > It seems there is an Either.Left stored somewhere in the Object. Could > > that be? > > > On 28 Nov 2015, at 20:18, Vasiliki Kalavri <[hidden email]> > > wrote: > > > > > > > > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) > > > > > |
Free forum by Nabble | Edit this page |