Hi all,
currently the Eclipse JDT compiler was the only compiler that included generic signatures for Lambda Expressions in class files which is necessary to use them type-safe in Flink. Unfortunalely, this "feature" was considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is why Lambdas do not work properly with the current version of Eclipse. I have opened a bug for that (see https://bugs.eclipse.org/bugs/show_bug.cgi?id=449063). The question is: Independent of the decision of the Eclipse JDT team, how do we want to deal with missing return type information? Option 1) Add a separate TypeInformation argument to each Java API operator. Leads to blown up API... .map((x)->x + 1, TypeInformation.fromString("Integer")) .flatMap((in, out)->out.collect(in), TypeInformation.fromClass(Integer.class)) Option 2) Introduce a wrapper class which implements ResultTypeQueryable. Leads to complicated syntax... .map(TypeHint.map((x)->x + 1, "Integer")); .map(TypeHint.map((x)->x + 1, Integer.class)); What are your opinions? Or any other ideas? Regards, Timo |
Is it possible to use a static method "hint" to create the hinting wrapper
function? Something like DataSet.map(hint( (x) -> x.toString() , String.class)); If we go for option (1), I would suggest to call the methods just "from" and overload them for String, Class, and TypeInformation Stephan On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> wrote: > Hi all, > > currently the Eclipse JDT compiler was the only compiler that included > generic signatures for Lambda Expressions in class files which is necessary > to use them type-safe in Flink. Unfortunalely, this "feature" was > considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is > why Lambdas do not work properly with the current version of Eclipse. I > have opened a bug for that (see https://bugs.eclipse.org/bugs/ > show_bug.cgi?id=449063). > > The question is: Independent of the decision of the Eclipse JDT team, how > do we want to deal with missing return type information? > > Option 1) > Add a separate TypeInformation argument to each Java API operator. Leads > to blown up API... > .map((x)->x + 1, TypeInformation.fromString("Integer")) > .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( > Integer.class)) > > Option 2) > Introduce a wrapper class which implements ResultTypeQueryable. Leads to > complicated syntax... > .map(TypeHint.map((x)->x + 1, "Integer")); > .map(TypeHint.map((x)->x + 1, Integer.class)); > > What are your opinions? Or any other ideas? > > > Regards, > Timo > |
What do you think about something like:
env.fromElements(1, 2, 3) .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns("Integer") .print(); This looks to me like the most readable and user-friendly solution. We only need to change the internals of DataSet a little bit, such that a possible TypeExtractor Exception is stored temporarily and thrown by the operator that follows if "returns()" was not called. Regards, Timo On 28.10.2014 15:34, Stephan Ewen wrote: > Is it possible to use a static method "hint" to create the hinting wrapper > function? > > Something like > > DataSet.map(hint( (x) -> x.toString() , String.class)); > > If we go for option (1), I would suggest to call the methods just "from" > and overload them for String, Class, and TypeInformation > > > Stephan > > > On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> wrote: > >> Hi all, >> >> currently the Eclipse JDT compiler was the only compiler that included >> generic signatures for Lambda Expressions in class files which is necessary >> to use them type-safe in Flink. Unfortunalely, this "feature" was >> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is >> why Lambdas do not work properly with the current version of Eclipse. I >> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >> show_bug.cgi?id=449063). >> >> The question is: Independent of the decision of the Eclipse JDT team, how >> do we want to deal with missing return type information? >> >> Option 1) >> Add a separate TypeInformation argument to each Java API operator. Leads >> to blown up API... >> .map((x)->x + 1, TypeInformation.fromString("Integer")) >> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >> Integer.class)) >> >> Option 2) >> Introduce a wrapper class which implements ResultTypeQueryable. Leads to >> complicated syntax... >> .map(TypeHint.map((x)->x + 1, "Integer")); >> .map(TypeHint.map((x)->x + 1, Integer.class)); >> >> What are your opinions? Or any other ideas? >> >> >> Regards, >> Timo >> |
I think that would look nice.
How easy is that to implement? With that change, we could not initialize the type info in the constructor any more, but would have to change everything to lazy initialization, which makes it complicated and error prone... On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <[hidden email]> wrote: > What do you think about something like: > > env.fromElements(1, 2, 3) > .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns(" > Integer") > .print(); > > This looks to me like the most readable and user-friendly solution. We > only need to change the internals of DataSet a little bit, such that a > possible TypeExtractor Exception is stored temporarily and thrown by the > operator that follows if "returns()" was not called. > > Regards, > Timo > > > > On 28.10.2014 15:34, Stephan Ewen wrote: > >> Is it possible to use a static method "hint" to create the hinting wrapper >> function? >> >> Something like >> >> DataSet.map(hint( (x) -> x.toString() , String.class)); >> >> If we go for option (1), I would suggest to call the methods just "from" >> and overload them for String, Class, and TypeInformation >> >> >> Stephan >> >> >> On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> wrote: >> >> Hi all, >>> >>> currently the Eclipse JDT compiler was the only compiler that included >>> generic signatures for Lambda Expressions in class files which is >>> necessary >>> to use them type-safe in Flink. Unfortunalely, this "feature" was >>> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is >>> why Lambdas do not work properly with the current version of Eclipse. I >>> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >>> show_bug.cgi?id=449063). >>> >>> The question is: Independent of the decision of the Eclipse JDT team, how >>> do we want to deal with missing return type information? >>> >>> Option 1) >>> Add a separate TypeInformation argument to each Java API operator. Leads >>> to blown up API... >>> .map((x)->x + 1, TypeInformation.fromString("Integer")) >>> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >>> Integer.class)) >>> >>> Option 2) >>> Introduce a wrapper class which implements ResultTypeQueryable. Leads to >>> complicated syntax... >>> .map(TypeHint.map((x)->x + 1, "Integer")); >>> .map(TypeHint.map((x)->x + 1, Integer.class)); >>> >>> What are your opinions? Or any other ideas? >>> >>> >>> Regards, >>> Timo >>> >>> > |
An alternative would be to go for
env.fromElements(1, 2, 3) .flatMap((Integer i, Collector<Integer> o) -> o.collect(i) , returns("Integer")) .print(); "returns" would here be a static method that creates the type info. That would require to add an additional parameter, but allow us to keep the immediate checks. Deferring the checks will make things harder to understand for users as well... Am 30.10.2014 11:44 schrieb "Stephan Ewen" <[hidden email]>: I think that would look nice. How easy is that to implement? With that change, we could not initialize the type info in the constructor any more, but would have to change everything to lazy initialization, which makes it complicated and error prone... On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <[hidden email]> wrote: > What do you think about something like: > > env.fromElements(1, 2, 3) > .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns(" > Integer") > .print(); > > This looks to me like the most readable and user-friendly solution. We > only need to change the internals of DataSet a little bit, such that a > possible TypeExtractor Exception is stored temporarily and thrown by the > operator that follows if "returns()" was not called. > > Regards, > Timo > > > > On 28.10.2014 15:34, Stephan Ewen wrote: > >> Is it possible to use a static method "hint" to create the hinting wrapper >> function? >> >> Something like >> >> DataSet.map(hint( (x) -> x.toString() , String.class)); >> >> If we go for option (1), I would suggest to call the methods just "from" >> and overload them for String, Class, and TypeInformation >> >> >> Stephan >> >> >> On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> wrote: >> >> Hi all, >>> >>> currently the Eclipse JDT compiler was the only compiler that included >>> generic signatures for Lambda Expressions in class files which is >>> necessary >>> to use them type-safe in Flink. Unfortunalely, this "feature" was >>> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is >>> why Lambdas do not work properly with the current version of Eclipse. I >>> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >>> show_bug.cgi?id=449063). >>> >>> The question is: Independent of the decision of the Eclipse JDT team, how >>> do we want to deal with missing return type information? >>> >>> Option 1) >>> Add a separate TypeInformation argument to each Java API operator. Leads >>> to blown up API... >>> .map((x)->x + 1, TypeInformation.fromString("Integer")) >>> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >>> Integer.class)) >>> >>> Option 2) >>> Introduce a wrapper class which implements ResultTypeQueryable. Leads to >>> complicated syntax... >>> .map(TypeHint.map((x)->x + 1, "Integer")); >>> .map(TypeHint.map((x)->x + 1, Integer.class)); >>> >>> What are your opinions? Or any other ideas? >>> >>> >>> Regards, >>> Timo >>> >>> > |
Hey,
I have made a small prototype for a map-operator env.fromElements(1, 2, 3) .map((i) -> new Tuple2<String,String>()).returns("Tuple2<String,String>") .print(); you can find my solution here: https://github.com/twalthr/incubator-flink/commit/3ce2d3c86cf2457e02986f7a2d858304bbefea58 Actually, I like this solution most as it looks very easy to the user. Furthermore, we can move the Type Extraction part into the operator which makes more sense to me. What do you think? Greetings, Timo On 02.11.2014 16:22, Stephan Ewen wrote: > An alternative would be to go for > > env.fromElements(1, 2, 3) > .flatMap((Integer i, Collector<Integer> o) -> o.collect(i) , > returns("Integer")) > .print(); > > "returns" would here be a static method that creates the type info. > > That would require to add an additional parameter, but allow us to keep the > immediate checks. Deferring the checks will make things harder to > understand for users as well... > Am 30.10.2014 11:44 schrieb "Stephan Ewen" <[hidden email]>: > > I think that would look nice. > > How easy is that to implement? With that change, we could not initialize > the type info in the constructor any more, but would have to change > everything to lazy initialization, which makes it complicated and error > prone... > > On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <[hidden email]> wrote: > >> What do you think about something like: >> >> env.fromElements(1, 2, 3) >> .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns(" >> Integer") >> .print(); >> >> This looks to me like the most readable and user-friendly solution. We >> only need to change the internals of DataSet a little bit, such that a >> possible TypeExtractor Exception is stored temporarily and thrown by the >> operator that follows if "returns()" was not called. >> >> Regards, >> Timo >> >> >> >> On 28.10.2014 15:34, Stephan Ewen wrote: >> >>> Is it possible to use a static method "hint" to create the hinting wrapper >>> function? >>> >>> Something like >>> >>> DataSet.map(hint( (x) -> x.toString() , String.class)); >>> >>> If we go for option (1), I would suggest to call the methods just "from" >>> and overload them for String, Class, and TypeInformation >>> >>> >>> Stephan >>> >>> >>> On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> wrote: >>> >>> Hi all, >>>> currently the Eclipse JDT compiler was the only compiler that included >>>> generic signatures for Lambda Expressions in class files which is >>>> necessary >>>> to use them type-safe in Flink. Unfortunalely, this "feature" was >>>> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This is >>>> why Lambdas do not work properly with the current version of Eclipse. I >>>> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >>>> show_bug.cgi?id=449063). >>>> >>>> The question is: Independent of the decision of the Eclipse JDT team, how >>>> do we want to deal with missing return type information? >>>> >>>> Option 1) >>>> Add a separate TypeInformation argument to each Java API operator. Leads >>>> to blown up API... >>>> .map((x)->x + 1, TypeInformation.fromString("Integer")) >>>> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >>>> Integer.class)) >>>> >>>> Option 2) >>>> Introduce a wrapper class which implements ResultTypeQueryable. Leads to >>>> complicated syntax... >>>> .map(TypeHint.map((x)->x + 1, "Integer")); >>>> .map(TypeHint.map((x)->x + 1, Integer.class)); >>>> >>>> What are your opinions? Or any other ideas? >>>> >>>> >>>> Regards, >>>> Timo >>>> >>>> |
I like the idea very much!
In my opinion, the DataSet is not quite the right place to put that functionality. I think the UnaryUDFOperator or the BinaryUDFOperator would be better. After all, these hooks are only necessary for UDFs. One more suggestion: - Can the TypeExctactor initially return a special "Unknown" type? The returns() method can override that type. Then we can also keep a bit more of the eager initialization. - The collection execution, for example, works without specific type information. It only needs the ability to clone, which is easily possible with an "unknown" type information, which can create a "defaultserializer" that simply uses Kryo to clone. That way, one could also use Java 8 lambdas inside IDE with collection execution, and on the cluster with the properly compiled code from maven. Stephan On Mon, Nov 3, 2014 at 12:23 PM, Timo Walther <[hidden email]> wrote: > Hey, > > I have made a small prototype for a map-operator > > env.fromElements(1, 2, 3) > .map((i) -> new Tuple2<String,String>()).returns("Tuple2<String,String>") > .print(); > > you can find my solution here: https://github.com/twalthr/ > incubator-flink/commit/3ce2d3c86cf2457e02986f7a2d858304bbefea58 > > Actually, I like this solution most as it looks very easy to the user. > Furthermore, we can move the Type Extraction part into the operator which > makes more sense to me. > > What do you think? > > Greetings, > Timo > > > > > > > On 02.11.2014 16:22, Stephan Ewen wrote: > >> An alternative would be to go for >> >> env.fromElements(1, 2, 3) >> .flatMap((Integer i, Collector<Integer> o) -> o.collect(i) , >> returns("Integer")) >> .print(); >> >> "returns" would here be a static method that creates the type info. >> >> That would require to add an additional parameter, but allow us to keep >> the >> immediate checks. Deferring the checks will make things harder to >> understand for users as well... >> Am 30.10.2014 11:44 schrieb "Stephan Ewen" <[hidden email]>: >> >> I think that would look nice. >> >> How easy is that to implement? With that change, we could not initialize >> the type info in the constructor any more, but would have to change >> everything to lazy initialization, which makes it complicated and error >> prone... >> >> On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <[hidden email]> wrote: >> >> What do you think about something like: >>> >>> env.fromElements(1, 2, 3) >>> .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns(" >>> Integer") >>> .print(); >>> >>> This looks to me like the most readable and user-friendly solution. We >>> only need to change the internals of DataSet a little bit, such that a >>> possible TypeExtractor Exception is stored temporarily and thrown by the >>> operator that follows if "returns()" was not called. >>> >>> Regards, >>> Timo >>> >>> >>> >>> On 28.10.2014 15:34, Stephan Ewen wrote: >>> >>> Is it possible to use a static method "hint" to create the hinting >>>> wrapper >>>> function? >>>> >>>> Something like >>>> >>>> DataSet.map(hint( (x) -> x.toString() , String.class)); >>>> >>>> If we go for option (1), I would suggest to call the methods just "from" >>>> and overload them for String, Class, and TypeInformation >>>> >>>> >>>> Stephan >>>> >>>> >>>> On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> >>>> wrote: >>>> >>>> Hi all, >>>> >>>>> currently the Eclipse JDT compiler was the only compiler that included >>>>> generic signatures for Lambda Expressions in class files which is >>>>> necessary >>>>> to use them type-safe in Flink. Unfortunalely, this "feature" was >>>>> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This >>>>> is >>>>> why Lambdas do not work properly with the current version of Eclipse. I >>>>> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >>>>> show_bug.cgi?id=449063). >>>>> >>>>> The question is: Independent of the decision of the Eclipse JDT team, >>>>> how >>>>> do we want to deal with missing return type information? >>>>> >>>>> Option 1) >>>>> Add a separate TypeInformation argument to each Java API operator. >>>>> Leads >>>>> to blown up API... >>>>> .map((x)->x + 1, TypeInformation.fromString("Integer")) >>>>> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >>>>> Integer.class)) >>>>> >>>>> Option 2) >>>>> Introduce a wrapper class which implements ResultTypeQueryable. Leads >>>>> to >>>>> complicated syntax... >>>>> .map(TypeHint.map((x)->x + 1, "Integer")); >>>>> .map(TypeHint.map((x)->x + 1, Integer.class)); >>>>> >>>>> What are your opinions? Or any other ideas? >>>>> >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> > |
I have implemented your idea of an Unkown type which uses the
KryoSerializer. Since I don't have type information, I initialize the the serializer with Object.class. Collection execution works fine but when I execute a simple identity mapper job normally I get the following Exception. Is there a way to get this working? 14/11/13 11:01:04 ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(TextOutputFormat (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) java.lang.IllegalStateException: Channel received an event before completing the current partial record. at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:175) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:245) at java.lang.Thread.run(Thread.java:701) 14/11/13 11:01:04 INFO taskmanager.Task: DataSink(TextOutputFormat (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) switched to FAILED : java.lang.IllegalStateException: Channel received an event before completing the current partial record. at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:175) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:245) at java.lang.Thread.run(Thread.java:701) On 05.11.2014 22:34, Stephan Ewen wrote: > I like the idea very much! > > In my opinion, the DataSet is not quite the right place to put that > functionality. I think the UnaryUDFOperator or the BinaryUDFOperator would > be better. After all, these hooks are only necessary for UDFs. > > One more suggestion: > > - Can the TypeExctactor initially return a special "Unknown" type? The > returns() method can override that type. Then we can also keep a bit more > of the eager initialization. > > - The collection execution, for example, works without specific type > information. It only needs the ability to clone, which is easily possible > with an "unknown" type information, which can create a "defaultserializer" > that simply uses Kryo to clone. > > That way, one could also use Java 8 lambdas inside IDE with collection > execution, and on the cluster with the properly compiled code from maven. > > Stephan > > > > > > On Mon, Nov 3, 2014 at 12:23 PM, Timo Walther <[hidden email]> wrote: > >> Hey, >> >> I have made a small prototype for a map-operator >> >> env.fromElements(1, 2, 3) >> .map((i) -> new Tuple2<String,String>()).returns("Tuple2<String,String>") >> .print(); >> >> you can find my solution here: https://github.com/twalthr/ >> incubator-flink/commit/3ce2d3c86cf2457e02986f7a2d858304bbefea58 >> >> Actually, I like this solution most as it looks very easy to the user. >> Furthermore, we can move the Type Extraction part into the operator which >> makes more sense to me. >> >> What do you think? >> >> Greetings, >> Timo >> >> >> >> >> >> >> On 02.11.2014 16:22, Stephan Ewen wrote: >> >>> An alternative would be to go for >>> >>> env.fromElements(1, 2, 3) >>> .flatMap((Integer i, Collector<Integer> o) -> o.collect(i) , >>> returns("Integer")) >>> .print(); >>> >>> "returns" would here be a static method that creates the type info. >>> >>> That would require to add an additional parameter, but allow us to keep >>> the >>> immediate checks. Deferring the checks will make things harder to >>> understand for users as well... >>> Am 30.10.2014 11:44 schrieb "Stephan Ewen" <[hidden email]>: >>> >>> I think that would look nice. >>> >>> How easy is that to implement? With that change, we could not initialize >>> the type info in the constructor any more, but would have to change >>> everything to lazy initialization, which makes it complicated and error >>> prone... >>> >>> On Wed, Oct 29, 2014 at 4:26 PM, Timo Walther <[hidden email]> wrote: >>> >>> What do you think about something like: >>>> env.fromElements(1, 2, 3) >>>> .flatMap((Integer i, Collector<Integer> o) -> o.collect(i)).returns(" >>>> Integer") >>>> .print(); >>>> >>>> This looks to me like the most readable and user-friendly solution. We >>>> only need to change the internals of DataSet a little bit, such that a >>>> possible TypeExtractor Exception is stored temporarily and thrown by the >>>> operator that follows if "returns()" was not called. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> >>>> On 28.10.2014 15:34, Stephan Ewen wrote: >>>> >>>> Is it possible to use a static method "hint" to create the hinting >>>>> wrapper >>>>> function? >>>>> >>>>> Something like >>>>> >>>>> DataSet.map(hint( (x) -> x.toString() , String.class)); >>>>> >>>>> If we go for option (1), I would suggest to call the methods just "from" >>>>> and overload them for String, Class, and TypeInformation >>>>> >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Tue, Oct 28, 2014 at 3:27 PM, Timo Walther <[hidden email]> >>>>> wrote: >>>>> >>>>> Hi all, >>>>> >>>>>> currently the Eclipse JDT compiler was the only compiler that included >>>>>> generic signatures for Lambda Expressions in class files which is >>>>>> necessary >>>>>> to use them type-safe in Flink. Unfortunalely, this "feature" was >>>>>> considered as a "bug" and had been thrown out with Eclipse 4.4.1. This >>>>>> is >>>>>> why Lambdas do not work properly with the current version of Eclipse. I >>>>>> have opened a bug for that (see https://bugs.eclipse.org/bugs/ >>>>>> show_bug.cgi?id=449063). >>>>>> >>>>>> The question is: Independent of the decision of the Eclipse JDT team, >>>>>> how >>>>>> do we want to deal with missing return type information? >>>>>> >>>>>> Option 1) >>>>>> Add a separate TypeInformation argument to each Java API operator. >>>>>> Leads >>>>>> to blown up API... >>>>>> .map((x)->x + 1, TypeInformation.fromString("Integer")) >>>>>> .flatMap((in, out)->out.collect(in), TypeInformation.fromClass( >>>>>> Integer.class)) >>>>>> >>>>>> Option 2) >>>>>> Introduce a wrapper class which implements ResultTypeQueryable. Leads >>>>>> to >>>>>> complicated syntax... >>>>>> .map(TypeHint.map((x)->x + 1, "Integer")); >>>>>> .map(TypeHint.map((x)->x + 1, Integer.class)); >>>>>> >>>>>> What are your opinions? Or any other ideas? >>>>>> >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> |
Just my two cents, but the Exception is thrown by the lower layer
serializers, which write/read IOReadableWriteable types. The respective exception is thrown if a partial record has not been fully deserialized and you receive an event (channel close event or so). The corresponding writer part is the RecordWriter class. I guess the problem is at the writer side already. I would have a look what is pushed down to the RecordWriter for further debugging. On Thu, Nov 13, 2014 at 11:06 AM, Timo Walther <[hidden email]> wrote: > I have implemented your idea of an Unkown type which uses the > KryoSerializer. Since I don't have type information, I initialize the the > serializer with Object.class. Collection execution works fine but when I > execute a simple identity mapper job normally I get the following > Exception. Is there a way to get this working? > > 14/11/13 11:01:04 ERROR operators.DataSinkTask: Error in user code: > Channel received an event before completing the current partial record.: > DataSink(TextOutputFormat (file:/tmp/org.apache.flink. > test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) > java.lang.IllegalStateException: Channel received an event before > completing the current partial record. > at org.apache.flink.runtime.io.network.channels.InputChannel. > readRecord(InputChannel.java:158) > at org.apache.flink.runtime.io.network.gates.InputGate. > readRecord(InputGate.java:176) > at org.apache.flink.runtime.io.network.api.MutableRecordReader.next( > MutableRecordReader.java:51) > at org.apache.flink.runtime.operators.util.ReaderIterator. > next(ReaderIterator.java:53) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:175) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:245) > at java.lang.Thread.run(Thread.java:701) > 14/11/13 11:01:04 INFO taskmanager.Task: DataSink(TextOutputFormat > (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) > - UTF-8) (1/1) switched to FAILED : java.lang.IllegalStateException: > Channel received an event before completing the current partial record. > at org.apache.flink.runtime.io.network.channels.InputChannel. > readRecord(InputChannel.java:158) > at org.apache.flink.runtime.io.network.gates.InputGate. > readRecord(InputGate.java:176) > at org.apache.flink.runtime.io.network.api.MutableRecordReader.next( > MutableRecordReader.java:51) > at org.apache.flink.runtime.operators.util.ReaderIterator. > next(ReaderIterator.java:53) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:175) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:245) > at java.lang.Thread.run(Thread.java:701) |
Looks to me like the deserialization is not happening properly, leavinf
some unconsumed bytes... On Thu, Nov 13, 2014 at 11:17 AM, Ufuk Celebi <[hidden email]> wrote: > Just my two cents, but the Exception is thrown by the lower layer > serializers, which write/read IOReadableWriteable types. The respective > exception is thrown if a partial record has not been fully deserialized and > you receive an event (channel close event or so). The corresponding writer > part is the RecordWriter class. > > I guess the problem is at the writer side already. I would have a look what > is pushed down to the RecordWriter for further debugging. > > On Thu, Nov 13, 2014 at 11:06 AM, Timo Walther <[hidden email]> wrote: > > > I have implemented your idea of an Unkown type which uses the > > KryoSerializer. Since I don't have type information, I initialize the the > > serializer with Object.class. Collection execution works fine but when I > > execute a simple identity mapper job normally I get the following > > Exception. Is there a way to get this working? > > > > 14/11/13 11:01:04 ERROR operators.DataSinkTask: Error in user code: > > Channel received an event before completing the current partial record.: > > DataSink(TextOutputFormat (file:/tmp/org.apache.flink. > > test.javaApiOperators.TypeHintITCase-result) - UTF-8) (1/1) > > java.lang.IllegalStateException: Channel received an event before > > completing the current partial record. > > at org.apache.flink.runtime.io.network.channels.InputChannel. > > readRecord(InputChannel.java:158) > > at org.apache.flink.runtime.io.network.gates.InputGate. > > readRecord(InputGate.java:176) > > at org.apache.flink.runtime.io.network.api.MutableRecordReader.next( > > MutableRecordReader.java:51) > > at org.apache.flink.runtime.operators.util.ReaderIterator. > > next(ReaderIterator.java:53) > > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > > DataSinkTask.java:175) > > at org.apache.flink.runtime.execution.RuntimeEnvironment. > > run(RuntimeEnvironment.java:245) > > at java.lang.Thread.run(Thread.java:701) > > 14/11/13 11:01:04 INFO taskmanager.Task: DataSink(TextOutputFormat > > (file:/tmp/org.apache.flink.test.javaApiOperators.TypeHintITCase-result) > > - UTF-8) (1/1) switched to FAILED : java.lang.IllegalStateException: > > Channel received an event before completing the current partial record. > > at org.apache.flink.runtime.io.network.channels.InputChannel. > > readRecord(InputChannel.java:158) > > at org.apache.flink.runtime.io.network.gates.InputGate. > > readRecord(InputGate.java:176) > > at org.apache.flink.runtime.io.network.api.MutableRecordReader.next( > > MutableRecordReader.java:51) > > at org.apache.flink.runtime.operators.util.ReaderIterator. > > next(ReaderIterator.java:53) > > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > > DataSinkTask.java:175) > > at org.apache.flink.runtime.execution.RuntimeEnvironment. > > run(RuntimeEnvironment.java:245) > > at java.lang.Thread.run(Thread.java:701) > |
Free forum by Nabble | Edit this page |