Problem with flink while development

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with flink while development

Jitendra Agrawal
Hi Team,

Problem Description :  When I was calling *reduce()* method on keyedStream
object then getting Ecxeption as

"* org.apache.flink.api.common.InvalidProgramException: Specifying keys via
field positions is only valid for tuple data types. Type: Integer*".

   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
  KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0);
  DataStream doubleIntegers4 = integers4.reduce(new
ReduceFunction<Integer>() {

@Override
public Integer reduce(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
}
});


can you please tell me how to solve this exception? or can you provide any
link or example where I get these complete Solution..

your help will be appreciated..

Thanks & Regards

 *Jitendra Agrawal *

Mumbai

9167539602

--
This email is sent on behalf of Northgate Public Services (UK) Limited and
its associated companies including Rave Technologies (India) Pvt Limited
(together "Northgate Public Services") and is strictly confidential and
intended solely for the addressee(s).
If you are not the intended recipient of this email you must: (i) not
disclose, copy or distribute its contents to any other person nor use its
contents in any way or you may be acting unlawfully;  (ii) contact
Northgate Public Services immediately on +44(0)1908 264500 quoting the name
of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that
no viruses are contained in this email, but does not accept any
responsibility once this email has been transmitted.  You should scan
attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales
under number 00968498 with a registered address of Peoplebuilding 2,
Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
4NN.  Rave Technologies (India) Pvt Limited, registered in India under
number 117068 with a registered address of 2nd Floor, Ballard House, Adi
Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Problem with flink while development

Saikat Maitra
Hello Jitendra,

 I am new to Flink community but may have seen this issue earlier. Can you
try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4

Regards Saikat

On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal <
[hidden email]> wrote:

> Hi Team,
>
> Problem Description :  When I was calling *reduce()* method on keyedStream
> object then getting Ecxeption as
>
> "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via
> field positions is only valid for tuple data types. Type: Integer*".
>
>    StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>   KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0);
>   DataStream doubleIntegers4 = integers4.reduce(new
> ReduceFunction<Integer>() {
>
> @Override
> public Integer reduce(Integer arg0, Integer arg1) throws Exception {
> return arg0 + arg1;
> }
> });
>
>
> can you please tell me how to solve this exception? or can you provide any
> link or example where I get these complete Solution..
>
> your help will be appreciated..
>
> Thanks & Regards
>
>  *Jitendra Agrawal *
>
> Mumbai
>
> 9167539602
>
> --
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500 quoting the name
> of the sender and the addressee then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>
Reply | Threaded
Open this post in threaded view
|

Re: Problem with flink while development

Chenguang He
It throws by 
if (!type.isTupleType()) {
                                throw new InvalidProgramException("Specifying keys via field positions is only valid " +
                                                "for tuple data types. Type: " + type);
                        }


So, like Saikat saying, you may need a stream with tuples, because 
public interface ReduceFunction<T> extends Function, Serializable {

        /**
         * The core method of ReduceFunction, combining two values into one value of the same type.
         * The reduce function is consecutively applied to all values of a group until only a single value remains.
         *
         * @param value1 The first value to combine.
         * @param value2 The second value to combine.
         * @return The combined value of both input values.
         *
         * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        T reduce(T value1, T value2) throws Exception;
}
 



Thanks

Chenguang He
Graduate Student
Department of Computer Science
Iowa State University

On April 18, 2016 at 1:30:19 PM, Saikat Maitra ([hidden email]) wrote:

Hello Jitendra,  

I am new to Flink community but may have seen this issue earlier. Can you  
try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4  

Regards Saikat  

On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal <  
[hidden email]> wrote:  

> Hi Team,  
>  
> Problem Description : When I was calling *reduce()* method on keyedStream  
> object then getting Ecxeption as  
>  
> "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via  
> field positions is only valid for tuple data types. Type: Integer*".  
>  
> StreamExecutionEnvironment env =  
> StreamExecutionEnvironment.getExecutionEnvironment();  
> KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0);  
> DataStream doubleIntegers4 = integers4.reduce(new  
> ReduceFunction<Integer>() {  
>  
> @Override  
> public Integer reduce(Integer arg0, Integer arg1) throws Exception {  
> return arg0 + arg1;  
> }  
> });  
>  
>  
> can you please tell me how to solve this exception? or can you provide any  
> link or example where I get these complete Solution..  
>  
> your help will be appreciated..  
>  
> Thanks & Regards  
>  
> *Jitendra Agrawal *  
>  
> Mumbai  
>  
> 9167539602  
>  
> --  
> This email is sent on behalf of Northgate Public Services (UK) Limited and  
> its associated companies including Rave Technologies (India) Pvt Limited  
> (together "Northgate Public Services") and is strictly confidential and  
> intended solely for the addressee(s).  
> If you are not the intended recipient of this email you must: (i) not  
> disclose, copy or distribute its contents to any other person nor use its  
> contents in any way or you may be acting unlawfully; (ii) contact  
> Northgate Public Services immediately on +44(0)1908 264500 quoting the name  
> of the sender and the addressee then delete it from your system.  
> Northgate Public Services has taken reasonable precautions to ensure that  
> no viruses are contained in this email, but does not accept any  
> responsibility once this email has been transmitted. You should scan  
> attachments (if any) for viruses.  
>  
> Northgate Public Services (UK) Limited, registered in England and Wales  
> under number 00968498 with a registered address of Peoplebuilding 2,  
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2  
> 4NN. Rave Technologies (India) Pvt Limited, registered in India under  
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi  
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.  
>  
Reply | Threaded
Open this post in threaded view
|

Re: Problem with flink while development

Matthias J. Sax-2
In reply to this post by Saikat Maitra
If you work on plain Integer (or other non-POJO types) you need to
provide a KeySelector to make it work.

For you case something like this:

.keyBy(new KeySelector<Integer, Integer>() {
        @Override
        public Integer getKey(Integer value) throws Exception {
                return value;
        }
})

As Saikat already mentioned, int-based index access to keys works only
for Flink's Tuple types.

For POJO's you can also access key by name.

https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html#datastream-transformations


-Matthias


On 04/18/2016 08:30 PM, Saikat Maitra wrote:

> Hello Jitendra,
>
>  I am new to Flink community but may have seen this issue earlier. Can you
> try to use DataStream<Tuple1<Integer>> instead of KeyedStream integers4
>
> Regards Saikat
>
> On Mon, Apr 18, 2016 at 7:37 PM, Jitendra Agrawal <
> [hidden email]> wrote:
>
>> Hi Team,
>>
>> Problem Description :  When I was calling *reduce()* method on keyedStream
>> object then getting Ecxeption as
>>
>> "* org.apache.flink.api.common.InvalidProgramException: Specifying keys via
>> field positions is only valid for tuple data types. Type: Integer*".
>>
>>    StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>   KeyedStream integers4 = env.fromElements(1,2,3).keyBy(0);
>>   DataStream doubleIntegers4 = integers4.reduce(new
>> ReduceFunction<Integer>() {
>>
>> @Override
>> public Integer reduce(Integer arg0, Integer arg1) throws Exception {
>> return arg0 + arg1;
>> }
>> });
>>
>>
>> can you please tell me how to solve this exception? or can you provide any
>> link or example where I get these complete Solution..
>>
>> your help will be appreciated..
>>
>> Thanks & Regards
>>
>>  *Jitendra Agrawal *
>>
>> Mumbai
>>
>> 9167539602
>>
>> --
>> This email is sent on behalf of Northgate Public Services (UK) Limited and
>> its associated companies including Rave Technologies (India) Pvt Limited
>> (together "Northgate Public Services") and is strictly confidential and
>> intended solely for the addressee(s).
>> If you are not the intended recipient of this email you must: (i) not
>> disclose, copy or distribute its contents to any other person nor use its
>> contents in any way or you may be acting unlawfully;  (ii) contact
>> Northgate Public Services immediately on +44(0)1908 264500 quoting the name
>> of the sender and the addressee then delete it from your system.
>> Northgate Public Services has taken reasonable precautions to ensure that
>> no viruses are contained in this email, but does not accept any
>> responsibility once this email has been transmitted.  You should scan
>> attachments (if any) for viruses.
>>
>> Northgate Public Services (UK) Limited, registered in England and Wales
>> under number 00968498 with a registered address of Peoplebuilding 2,
>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>
>


signature.asc (836 bytes) Download Attachment