Hi
I am getting following issues in working code in new version 1.4.0. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of com.goibibo.NewClass$$anonfun$main$1 to field org.apache.flink.streaming.api.scala.DataStream$$anon$6.cleanFun$5 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$6 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ... 10 more -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Basically I am registering one scalarfunction in my code. Is there any
change in writing or registering a User Defined function in flink 1.4.0? On Wed, Dec 13, 2017 at 11:55 PM, Shivam Sharma <[hidden email]> wrote: > Hi > > I am getting following issues in working code in new version 1.4.0. > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( > StreamConfig.java:235) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:355) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:346) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:346) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > com.goibibo.NewClass$$anonfun$main$1 to field org.apache.flink.streaming. > api.scala.DataStream$$anon$6.cleanFun$5 of type scala.Function1 in > instance of org.apache.flink.streaming.api.scala.DataStream$$anon$6 > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > ObjectStreamClass.java:2133) > at java.io.ObjectStreamClass.setObjFieldValues( > ObjectStreamClass.java:1305) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2251) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:248) > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( > StreamConfig.java:220) > ... 10 more > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Hi,
Could you please provide a bit of context. From your second email I gather that you're using the Table API, is that right? Best, Aljoscha > On 14. Dec 2017, at 07:08, Shivam Sharma <[hidden email]> wrote: > > Basically I am registering one scalarfunction in my code. Is there any > change in writing or registering a User Defined function in flink 1.4.0? > > On Wed, Dec 13, 2017 at 11:55 PM, Shivam Sharma <[hidden email]> > wrote: > >> Hi >> >> I am getting following issues in working code in new version 1.4.0. >> >> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot >> instantiate user function. >> at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( >> StreamConfig.java:235) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createChainedOperator(OperatorChain.java:355) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createOutputCollector(OperatorChain.java:282) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createChainedOperator(OperatorChain.java:346) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createOutputCollector(OperatorChain.java:282) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createChainedOperator(OperatorChain.java:346) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. >> createOutputCollector(OperatorChain.java:282) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain.< >> init>(OperatorChain.java:126) >> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask.java:231) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ClassCastException: cannot assign instance of >> com.goibibo.NewClass$$anonfun$main$1 to field org.apache.flink.streaming. >> api.scala.DataStream$$anon$6.cleanFun$5 of type scala.Function1 in >> instance of org.apache.flink.streaming.api.scala.DataStream$$anon$6 >> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( >> ObjectStreamClass.java:2133) >> at java.io.ObjectStreamClass.setObjFieldValues( >> ObjectStreamClass.java:1305) >> at java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2251) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) >> at java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2027) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at java.io.ObjectInputStream.defaultReadFields( >> ObjectInputStream.java:2245) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) >> at java.io.ObjectInputStream.readOrdinaryObject( >> ObjectInputStream.java:2027) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) >> at org.apache.flink.util.InstantiationUtil.deserializeObject( >> InstantiationUtil.java:290) >> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( >> InstantiationUtil.java:248) >> at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( >> StreamConfig.java:220) >> ... 10 more >> >> -- >> Shivam Sharma >> Data Engineer @ Goibibo >> Indian Institute Of Information Technology, Design and Manufacturing >> Jabalpur >> Mobile No- (+91) 8882114744 >> Email:- [hidden email] >> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma >> <https://www.linkedin.com/in/28shivamsharma>* >> > > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* |
yeah I am using Table API of flink.
Basically I am creating my UserDefined Function like below: class DateTimeUDF extends ScalarFunction { def eval(timeStamp: String, destFormat: String): String = { val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val date: Date = format.parse(timeStamp) new SimpleDateFormat(destFormat).format(date) } } And I am registering this function on Table env like below: tableEnv.registerFunction("changeDTFormat", new DateTimeUDF()) Note:- My code is working fine in Flink 1.3.2. On Thu, Dec 14, 2017 at 11:42 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > > Could you please provide a bit of context. From your second email I gather > that you're using the Table API, is that right? > > Best, > Aljoscha > > > On 14. Dec 2017, at 07:08, Shivam Sharma <[hidden email]> > wrote: > > > > Basically I am registering one scalarfunction in my code. Is there any > > change in writing or registering a User Defined function in flink 1.4.0? > > > > On Wed, Dec 13, 2017 at 11:55 PM, Shivam Sharma < > [hidden email]> > > wrote: > > > >> Hi > >> > >> I am getting following issues in working code in new version 1.4.0. > >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > >> instantiate user function. > >> at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( > >> StreamConfig.java:235) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createChainedOperator(OperatorChain.java:355) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createOutputCollector(OperatorChain.java:282) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createChainedOperator(OperatorChain.java:346) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createOutputCollector(OperatorChain.java:282) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createChainedOperator(OperatorChain.java:346) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > >> createOutputCollector(OperatorChain.java:282) > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > >> init>(OperatorChain.java:126) > >> at org.apache.flink.streaming.runtime.tasks.StreamTask. > >> invoke(StreamTask.java:231) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > >> at java.lang.Thread.run(Thread.java:748) > >> Caused by: java.lang.ClassCastException: cannot assign instance of > >> com.goibibo.NewClass$$anonfun$main$1 to field > org.apache.flink.streaming. > >> api.scala.DataStream$$anon$6.cleanFun$5 of type scala.Function1 in > >> instance of org.apache.flink.streaming.api.scala.DataStream$$anon$6 > >> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > >> ObjectStreamClass.java:2133) > >> at java.io.ObjectStreamClass.setObjFieldValues( > >> ObjectStreamClass.java:1305) > >> at java.io.ObjectInputStream.defaultReadFields( > >> ObjectInputStream.java:2251) > >> at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2169) > >> at java.io.ObjectInputStream.readOrdinaryObject( > >> ObjectInputStream.java:2027) > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > >> at java.io.ObjectInputStream.defaultReadFields( > >> ObjectInputStream.java:2245) > >> at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2169) > >> at java.io.ObjectInputStream.readOrdinaryObject( > >> ObjectInputStream.java:2027) > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > >> at org.apache.flink.util.InstantiationUtil.deserializeObject( > >> InstantiationUtil.java:290) > >> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > >> InstantiationUtil.java:248) > >> at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator( > >> StreamConfig.java:220) > >> ... 10 more > >> > >> -- > >> Shivam Sharma > >> Data Engineer @ Goibibo > >> Indian Institute Of Information Technology, Design and Manufacturing > >> Jabalpur > >> Mobile No- (+91) 8882114744 > >> Email:- [hidden email] > >> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > >> <https://www.linkedin.com/in/28shivamsharma>* > >> > > > > > > > > -- > > Shivam Sharma > > Data Engineer @ Goibibo > > Indian Institute Of Information Technology, Design and Manufacturing > > Jabalpur > > Mobile No- (+91) 8882114744 > > Email:- [hidden email] > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > <https://www.linkedin.com/in/28shivamsharma>* > > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Can you give a bit more context?
When does the error occur? Does it happen in the client (i.e., when the query is optimized and the plan is generated) or in the JobManager or TaskManager when the plan is submitted to the cluster? Do you try to start from a savepoint? Thank you, Fabian 2017-12-14 8:52 GMT+01:00 Shivam Sharma <[hidden email]>: > yeah I am using Table API of flink. > > Basically I am creating my UserDefined Function like below: > > class DateTimeUDF extends ScalarFunction { > > def eval(timeStamp: String, destFormat: String): String = { > val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") > val date: Date = format.parse(timeStamp) > new SimpleDateFormat(destFormat).format(date) > } > } > > And I am registering this function on Table env like below: > > tableEnv.registerFunction("changeDTFormat", new DateTimeUDF()) > > Note:- My code is working fine in Flink 1.3.2. > > On Thu, Dec 14, 2017 at 11:42 AM, Aljoscha Krettek <[hidden email]> > wrote: > > > Hi, > > > > Could you please provide a bit of context. From your second email I > gather > > that you're using the Table API, is that right? > > > > Best, > > Aljoscha > > > > > On 14. Dec 2017, at 07:08, Shivam Sharma <[hidden email]> > > wrote: > > > > > > Basically I am registering one scalarfunction in my code. Is there any > > > change in writing or registering a User Defined function in flink > 1.4.0? > > > > > > On Wed, Dec 13, 2017 at 11:55 PM, Shivam Sharma < > > [hidden email]> > > > wrote: > > > > > >> Hi > > >> > > >> I am getting following issues in working code in new version 1.4.0. > > >> > > >> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > > >> instantiate user function. > > >> at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator( > > >> StreamConfig.java:235) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createChainedOperator(OperatorChain.java:355) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createOutputCollector(OperatorChain.java:282) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createChainedOperator(OperatorChain.java:346) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createOutputCollector(OperatorChain.java:282) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createChainedOperator(OperatorChain.java:346) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > >> createOutputCollector(OperatorChain.java:282) > > >> at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > >> init>(OperatorChain.java:126) > > >> at org.apache.flink.streaming.runtime.tasks.StreamTask. > > >> invoke(StreamTask.java:231) > > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > >> at java.lang.Thread.run(Thread.java:748) > > >> Caused by: java.lang.ClassCastException: cannot assign instance of > > >> com.goibibo.NewClass$$anonfun$main$1 to field > > org.apache.flink.streaming. > > >> api.scala.DataStream$$anon$6.cleanFun$5 of type scala.Function1 in > > >> instance of org.apache.flink.streaming.api.scala.DataStream$$anon$6 > > >> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > > >> ObjectStreamClass.java:2133) > > >> at java.io.ObjectStreamClass.setObjFieldValues( > > >> ObjectStreamClass.java:1305) > > >> at java.io.ObjectInputStream.defaultReadFields( > > >> ObjectInputStream.java:2251) > > >> at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2169) > > >> at java.io.ObjectInputStream.readOrdinaryObject( > > >> ObjectInputStream.java:2027) > > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > > >> at java.io.ObjectInputStream.defaultReadFields( > > >> ObjectInputStream.java:2245) > > >> at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2169) > > >> at java.io.ObjectInputStream.readOrdinaryObject( > > >> ObjectInputStream.java:2027) > > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > > >> at org.apache.flink.util.InstantiationUtil.deserializeObject( > > >> InstantiationUtil.java:290) > > >> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > > >> InstantiationUtil.java:248) > > >> at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator( > > >> StreamConfig.java:220) > > >> ... 10 more > > >> > > >> -- > > >> Shivam Sharma > > >> Data Engineer @ Goibibo > > >> Indian Institute Of Information Technology, Design and Manufacturing > > >> Jabalpur > > >> Mobile No- (+91) 8882114744 > > >> Email:- [hidden email] > > >> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > >> <https://www.linkedin.com/in/28shivamsharma>* > > >> > > > > > > > > > > > > -- > > > Shivam Sharma > > > Data Engineer @ Goibibo > > > Indian Institute Of Information Technology, Design and Manufacturing > > > Jabalpur > > > Mobile No- (+91) 8882114744 > > > Email:- [hidden email] > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > > <https://www.linkedin.com/in/28shivamsharma>* > > > > > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* > |
Free forum by Nabble | Edit this page |