Hi Team,
Problem Description : When I was calling *reduce()* method on keyedStream object then getting Ecxeption as "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: Integer*". StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0); DataStream doubleIntegers4 = integers4.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer arg0, Integer arg1) throws Exception { return arg0 + arg1; } }); can you please tell me how to solve this exception? or can you provide any link or example where I get these complete Solution.. your help will be appreciated.. Thanks & Regards *Jitendra Agrawal * Mumbai 9167539602 -- This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. |
Hello Jitendra,
I am new to Flink community but may have seen this issue earlier. Can you try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4 Regards Saikat On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal < [hidden email]> wrote: > Hi Team, > > Problem Description : When I was calling *reduce()* method on keyedStream > object then getting Ecxeption as > > "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via > field positions is only valid for tuple data types. Type: Integer*". > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0); > DataStream doubleIntegers4 = integers4.reduce(new > ReduceFunction<Integer>() { > > @Override > public Integer reduce(Integer arg0, Integer arg1) throws Exception { > return arg0 + arg1; > } > }); > > > can you please tell me how to solve this exception? or can you provide any > link or example where I get these complete Solution.. > > your help will be appreciated.. > > Thanks & Regards > > *Jitendra Agrawal * > > Mumbai > > 9167539602 > > -- > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 quoting the name > of the sender and the addressee then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. > |
It throws by
if (!type.isTupleType()) { throw new InvalidProgramException("Specifying keys via field positions is only valid " + "for tuple data types. Type: " + type); } So, like Saikat saying, you may need a stream with tuples, because public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. * The reduce function is consecutively applied to all values of a group until only a single value remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; } Thanks Chenguang He Graduate Student Department of Computer Science Iowa State University On April 18, 2016 at 1:30:19 PM, Saikat Maitra ([hidden email]) wrote: Hello Jitendra, I am new to Flink community but may have seen this issue earlier. Can you try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4 Regards Saikat On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal < [hidden email]> wrote: > Hi Team, > > Problem Description : When I was calling *reduce()* method on keyedStream > object then getting Ecxeption as > > "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via > field positions is only valid for tuple data types. Type: Integer*". > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0); > DataStream doubleIntegers4 = integers4.reduce(new > ReduceFunction<Integer>() { > > @Override > public Integer reduce(Integer arg0, Integer arg1) throws Exception { > return arg0 + arg1; > } > }); > > > can you please tell me how to solve this exception? or can you provide any > link or example where I get these complete Solution.. > > your help will be appreciated.. > > Thanks & Regards > > *Jitendra Agrawal * > > Mumbai > > 9167539602 > > -- > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 quoting the name > of the sender and the addressee then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. > |
In reply to this post by Saikat Maitra
If you work on plain Integer (or other non-POJO types) you need to
provide a KeySelector to make it work. For you case something like this: .keyBy(new KeySelector<Integer, Integer>() { @Override public Integer getKey(Integer value) throws Exception { return value; } }) As Saikat already mentioned, int-based index access to keys works only for Flink's Tuple types. For POJO's you can also access key by name. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#datastream-transformations -Matthias On 04/18/2016 08:30 PM, Saikat Maitra wrote: > Hello Jitendra, > > I am new to Flink community but may have seen this issue earlier. Can you > try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4 > > Regards Saikat > > On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal < > [hidden email]> wrote: > >> Hi Team, >> >> Problem Description : When I was calling *reduce()* method on keyedStream >> object then getting Ecxeption as >> >> "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via >> field positions is only valid for tuple data types. Type: Integer*". >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0); >> DataStream doubleIntegers4 = integers4.reduce(new >> ReduceFunction<Integer>() { >> >> @Override >> public Integer reduce(Integer arg0, Integer arg1) throws Exception { >> return arg0 + arg1; >> } >> }); >> >> >> can you please tell me how to solve this exception? or can you provide any >> link or example where I get these complete Solution.. >> >> your help will be appreciated.. >> >> Thanks & Regards >> >> *Jitendra Agrawal * >> >> Mumbai >> >> 9167539602 >> >> -- >> This email is sent on behalf of Northgate Public Services (UK) Limited and >> its associated companies including Rave Technologies (India) Pvt Limited >> (together "Northgate Public Services") and is strictly confidential and >> intended solely for the addressee(s). >> If you are not the intended recipient of this email you must: (i) not >> disclose, copy or distribute its contents to any other person nor use its >> contents in any way or you may be acting unlawfully; (ii) contact >> Northgate Public Services immediately on +44(0)1908 264500 quoting the name >> of the sender and the addressee then delete it from your system. >> Northgate Public Services has taken reasonable precautions to ensure that >> no viruses are contained in this email, but does not accept any >> responsibility once this email has been transmitted. You should scan >> attachments (if any) for viruses. >> >> Northgate Public Services (UK) Limited, registered in England and Wales >> under number 00968498 with a registered address of Peoplebuilding 2, >> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 >> 4NN. Rave Technologies (India) Pvt Limited, registered in India under >> number 117068 with a registered address of 2nd Floor, Ballard House, Adi >> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >> > |
Free forum by Nabble | Edit this page |