Ryan Brideau created FLINK-8256:
----------------------------------- Summary: Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException Key: FLINK-8256 URL: https://issues.apache.org/jira/browse/FLINK-8256 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.4.0 Environment: macOS, Local Flink v1.4.0, Scala 2.11 Reporter: Ryan Brideau I built the newest release locally today, but when I try to filter a stream using an anonymous or named function, I get an error. Here's a simple example: {code:java} import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object TestFunction { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val someArray = Array(1,2,3) val stream = env.fromCollection(someArray).filter(_ => true) stream.print().setParallelism(1) env.execute("Testing Function") } } {code} This results in: {code:java} Job execution switched to status FAILING. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of org.peopleinmotion.TestFunction$$anonfun$1 to field org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$7 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ... 6 more 12/13/2017 15:10:01 Job execution switched to status FAILED. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) at org.peopleinmotion.TestFunction.main(TestFunction.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of org.peopleinmotion.TestFunction$$anonfun$1 to field org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$7 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) {code} However, replacing the function with this results in everything working as expected: {code:java} val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] { override def filter(t: Int): Boolean = true }) {code} Perhaps something changed in the new build compared to the previous, as this was working without issue before? -- This message was sent by Atlassian JIRA (v6.4.14#64029) |
Same Issue I am facing :-
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html Can anyone explain the exception Thanks On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <[hidden email]> wrote: > Ryan Brideau created FLINK-8256: > ----------------------------------- > > Summary: Cannot use Scala functions to filter in 1.4 - > java.lang.ClassCastException > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > Reporter: Ryan Brideau > > > I built the newest release locally today, but when I try to filter a > stream using an anonymous or named function, I get an error. Here's a > simple example: > > > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > > object TestFunction { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > > This results in: > > > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:235) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:355) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > type scala.Function1 in instance of org.apache.flink.streaming. > api.scala.DataStream$$anon$7 > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > ObjectStreamClass.java:2233) > at java.io.ObjectStreamClass.setObjFieldValues( > ObjectStreamClass.java:1405) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2288) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:248) > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:492) > at org.apache.flink.client.program.StandaloneClusterClient. > submitJob(StandaloneClusterClient.java:105) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:456) > at org.apache.flink.streaming.api.environment. > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at org.apache.flink.streaming.api.scala. > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:525) > at org.apache.flink.client.program.PackagedProgram. > invokeInteractiveModeForExecution(PackagedProgram.java:417) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:396) > at org.apache.flink.client.CliFrontend.executeProgram( > CliFrontend.java:802) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > at org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1054) > at org.apache.flink.client.CliFrontend$1.call( > CliFrontend.java:1101) > at org.apache.flink.client.CliFrontend$1.call( > CliFrontend.java:1098) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1556) > at org.apache.flink.runtime.security.HadoopSecurityContext. > runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > mcV$sp(JobManager.scala:897) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at akka.dispatch.ForkJoinExecutorConfigurator$ > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:235) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:355) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > type scala.Function1 in instance of org.apache.flink.streaming. > api.scala.DataStream$$anon$7 > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > ObjectStreamClass.java:2233) > at java.io.ObjectStreamClass.setObjFieldValues( > ObjectStreamClass.java:1405) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2288) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:248) > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:220) > > {code} > > However, replacing the function with this results in everything working as > expected: > > {code:java} > val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] > { > override def filter(t: Int): Boolean = true > }) > {code} > > Perhaps something changed in the new build compared to the previous, as > this was working without issue before? > > > > -- > This message was sent by Atlassian JIRA > (v6.4.14#64029) > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
Hi!
This may be due to the changed classloading semantics. Just to verify this, can you check if it gets solved by setting the following in the Flink configuration: "classloader.resolve-order: parent- first" By default, Flink 1.4 uses now inverted classloading to allow users to use their own copies of dependencies, irrespective of what the underlying classpath is spoiled with. You can for example use a different Avro versions than Hadoop pull in, even without shading, or even different Akka / Jackson / etc versions. That is a nice improvement, but it may have some impacts on tools that have been build before. When you see classcast exceptions (like X cannot be cast to X), that is probably caused by the fact that the classloader duplicates a dependency from the JVM classpath in user-space, but objects/classes move between the domains. Stephan On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <[hidden email]> wrote: > Same Issue I am facing :- > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > Can anyone explain the exception > > Thanks > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <[hidden email]> > wrote: > > > Ryan Brideau created FLINK-8256: > > ----------------------------------- > > > > Summary: Cannot use Scala functions to filter in 1.4 - > > java.lang.ClassCastException > > Key: FLINK-8256 > > URL: https://issues.apache.org/jira/browse/FLINK-8256 > > Project: Flink > > Issue Type: Bug > > Components: DataStream API > > Affects Versions: 1.4.0 > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > Reporter: Ryan Brideau > > > > > > I built the newest release locally today, but when I try to filter a > > stream using an anonymous or named function, I get an error. Here's a > > simple example: > > > > > > {code:java} > > import org.apache.flink.api.java.utils.ParameterTool > > import org.apache.flink.streaming.api.scala._ > > > > object TestFunction { > > > > def main(args: Array[String]): Unit = { > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val params = ParameterTool.fromArgs(args) > > env.getConfig.setGlobalJobParameters(params) > > > > val someArray = Array(1,2,3) > > val stream = env.fromCollection(someArray).filter(_ => true) > > stream.print().setParallelism(1) > > env.execute("Testing Function") > > } > > } > > {code} > > > > This results in: > > > > > > {code:java} > > Job execution switched to status FAILING. > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > > instantiate user function. > > at org.apache.flink.streaming.api.graph.StreamConfig. > > getStreamOperator(StreamConfig.java:235) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > createChainedOperator(OperatorChain.java:355) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > createOutputCollector(OperatorChain.java:282) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > init>(OperatorChain.java:126) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > invoke(StreamTask.java:231) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.ClassCastException: cannot assign instance of > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > > type scala.Function1 in instance of org.apache.flink.streaming. > > api.scala.DataStream$$anon$7 > > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > > ObjectStreamClass.java:2233) > > at java.io.ObjectStreamClass.setObjFieldValues( > > ObjectStreamClass.java:1405) > > at java.io.ObjectInputStream.defaultReadFields( > > ObjectInputStream.java:2288) > > at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2206) > > at java.io.ObjectInputStream.readOrdinaryObject( > > ObjectInputStream.java:2064) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > java:1568) > > at java.io.ObjectInputStream.defaultReadFields( > > ObjectInputStream.java:2282) > > at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2206) > > at java.io.ObjectInputStream.readOrdinaryObject( > > ObjectInputStream.java:2064) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > java:1568) > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > java:428) > > at org.apache.flink.util.InstantiationUtil.deserializeObject( > > InstantiationUtil.java:290) > > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > > InstantiationUtil.java:248) > > at org.apache.flink.streaming.api.graph.StreamConfig. > > getStreamOperator(StreamConfig.java:220) > > ... 6 more > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > ------------------------------------------------------------ > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Job execution failed. > > at org.apache.flink.client.program.ClusterClient.run( > > ClusterClient.java:492) > > at org.apache.flink.client.program.StandaloneClusterClient. > > submitJob(StandaloneClusterClient.java:105) > > at org.apache.flink.client.program.ClusterClient.run( > > ClusterClient.java:456) > > at org.apache.flink.streaming.api.environment. > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > > at org.apache.flink.streaming.api.scala. > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > NativeMethodAccessorImpl.java:62) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at org.apache.flink.client.program.PackagedProgram.callMainMeth > od( > > PackagedProgram.java:525) > > at org.apache.flink.client.program.PackagedProgram. > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > at org.apache.flink.client.program.ClusterClient.run( > > ClusterClient.java:396) > > at org.apache.flink.client.CliFrontend.executeProgram( > > CliFrontend.java:802) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > > at org.apache.flink.client.CliFrontend.parseParameters( > > CliFrontend.java:1054) > > at org.apache.flink.client.CliFrontend$1.call( > > CliFrontend.java:1101) > > at org.apache.flink.client.CliFrontend$1.call( > > CliFrontend.java:1098) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > UserGroupInformation.java:1556) > > at org.apache.flink.runtime.security.HadoopSecurityContext. > > runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java: > 1098) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > > execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > mcV$sp(JobManager.scala:897) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > ger.scala:840) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > ger.scala:840) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > > Cannot instantiate user function. > > at org.apache.flink.streaming.api.graph.StreamConfig. > > getStreamOperator(StreamConfig.java:235) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > createChainedOperator(OperatorChain.java:355) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > createOutputCollector(OperatorChain.java:282) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > init>(OperatorChain.java:126) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > invoke(StreamTask.java:231) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.ClassCastException: cannot assign instance of > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > > type scala.Function1 in instance of org.apache.flink.streaming. > > api.scala.DataStream$$anon$7 > > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > > ObjectStreamClass.java:2233) > > at java.io.ObjectStreamClass.setObjFieldValues( > > ObjectStreamClass.java:1405) > > at java.io.ObjectInputStream.defaultReadFields( > > ObjectInputStream.java:2288) > > at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2206) > > at java.io.ObjectInputStream.readOrdinaryObject( > > ObjectInputStream.java:2064) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > java:1568) > > at java.io.ObjectInputStream.defaultReadFields( > > ObjectInputStream.java:2282) > > at java.io.ObjectInputStream.readSerialData( > > ObjectInputStream.java:2206) > > at java.io.ObjectInputStream.readOrdinaryObject( > > ObjectInputStream.java:2064) > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > java:1568) > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > java:428) > > at org.apache.flink.util.InstantiationUtil.deserializeObject( > > InstantiationUtil.java:290) > > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > > InstantiationUtil.java:248) > > at org.apache.flink.streaming.api.graph.StreamConfig. > > getStreamOperator(StreamConfig.java:220) > > > > {code} > > > > However, replacing the function with this results in everything working > as > > expected: > > > > {code:java} > > val stream = env.fromCollection(someArray).filter(new > FilterFunction[Int] > > { > > override def filter(t: Int): Boolean = true > > }) > > {code} > > > > Perhaps something changed in the new build compared to the previous, as > > this was working without issue before? > > > > > > > > -- > > This message was sent by Atlassian JIRA > > (v6.4.14#64029) > > > > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* > |
Hi,
I tried to reproduce the problem given your description Ryan. I submitted the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from flink.apache.org and a cluster built from sources). However, I was not able to reproduce the problem. Therefore I suspect that it has something to do with your or my setup. In order to further diagnose the problem, it would be tremendously helpful if you could share the logs contained in the logs directory with us. Cheers, Till On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <[hidden email]> wrote: > Hi! > > This may be due to the changed classloading semantics. > > Just to verify this, can you check if it gets solved by setting the > following in the Flink configuration: "classloader.resolve-order: parent- > first" > > By default, Flink 1.4 uses now inverted classloading to allow users to use > their own copies of dependencies, irrespective of what the underlying > classpath is spoiled with. You can for example use a different Avro > versions than Hadoop pull in, even without shading, or even different Akka > / Jackson / etc versions. > > That is a nice improvement, but it may have some impacts on tools that have > been build before. When you see classcast exceptions (like X cannot be cast > to X), that is probably caused by the fact that the classloader duplicates > a dependency from the JVM classpath in user-space, but objects/classes move > between the domains. > > Stephan > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <[hidden email]> > wrote: > > > Same Issue I am facing :- > > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > > > Can anyone explain the exception > > > > Thanks > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <[hidden email]> > > wrote: > > > > > Ryan Brideau created FLINK-8256: > > > ----------------------------------- > > > > > > Summary: Cannot use Scala functions to filter in 1.4 - > > > java.lang.ClassCastException > > > Key: FLINK-8256 > > > URL: https://issues.apache.org/jira/browse/FLINK-8256 > > > Project: Flink > > > Issue Type: Bug > > > Components: DataStream API > > > Affects Versions: 1.4.0 > > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > > Reporter: Ryan Brideau > > > > > > > > > I built the newest release locally today, but when I try to filter a > > > stream using an anonymous or named function, I get an error. Here's a > > > simple example: > > > > > > > > > {code:java} > > > import org.apache.flink.api.java.utils.ParameterTool > > > import org.apache.flink.streaming.api.scala._ > > > > > > object TestFunction { > > > > > > def main(args: Array[String]): Unit = { > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > val params = ParameterTool.fromArgs(args) > > > env.getConfig.setGlobalJobParameters(params) > > > > > > val someArray = Array(1,2,3) > > > val stream = env.fromCollection(someArray).filter(_ => true) > > > stream.print().setParallelism(1) > > > env.execute("Testing Function") > > > } > > > } > > > {code} > > > > > > This results in: > > > > > > > > > {code:java} > > > Job execution switched to status FAILING. > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > > > instantiate user function. > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > getStreamOperator(StreamConfig.java:235) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > createChainedOperator(OperatorChain.java:355) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > createOutputCollector(OperatorChain.java:282) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > > init>(OperatorChain.java:126) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > invoke(StreamTask.java:231) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > java:718) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > api.scala.DataStream$$anon$7 > > > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > > > ObjectStreamClass.java:2233) > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > ObjectStreamClass.java:1405) > > > at java.io.ObjectInputStream.defaultReadFields( > > > ObjectInputStream.java:2288) > > > at java.io.ObjectInputStream.readSerialData( > > > ObjectInputStream.java:2206) > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > ObjectInputStream.java:2064) > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > java:1568) > > > at java.io.ObjectInputStream.defaultReadFields( > > > ObjectInputStream.java:2282) > > > at java.io.ObjectInputStream.readSerialData( > > > ObjectInputStream.java:2206) > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > ObjectInputStream.java:2064) > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > java:1568) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > java:428) > > > at org.apache.flink.util.InstantiationUtil.deserializeObject( > > > InstantiationUtil.java:290) > > > at org.apache.flink.util.InstantiationUtil. > readObjectFromConfig( > > > InstantiationUtil.java:248) > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > getStreamOperator(StreamConfig.java:220) > > > ... 6 more > > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > > > ------------------------------------------------------------ > > > The program finished with the following exception: > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > program > > > execution failed: Job execution failed. > > > at org.apache.flink.client.program.ClusterClient.run( > > > ClusterClient.java:492) > > > at org.apache.flink.client.program.StandaloneClusterClient. > > > submitJob(StandaloneClusterClient.java:105) > > > at org.apache.flink.client.program.ClusterClient.run( > > > ClusterClient.java:456) > > > at org.apache.flink.streaming.api.environment. > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > > > at org.apache.flink.streaming.api.scala. > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment. > scala:638) > > > at org.peopleinmotion.TestFunction$.main( > TestFunction.scala:20) > > > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > > NativeMethodAccessorImpl.java:62) > > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > DelegatingMethodAccessorImpl.java:43) > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > at org.apache.flink.client.program.PackagedProgram. > callMainMeth > > od( > > > PackagedProgram.java:525) > > > at org.apache.flink.client.program.PackagedProgram. > > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > > at org.apache.flink.client.program.ClusterClient.run( > > > ClusterClient.java:396) > > > at org.apache.flink.client.CliFrontend.executeProgram( > > > CliFrontend.java:802) > > > at org.apache.flink.client.CliFrontend.run(CliFrontend. > java:282) > > > at org.apache.flink.client.CliFrontend.parseParameters( > > > CliFrontend.java:1054) > > > at org.apache.flink.client.CliFrontend$1.call( > > > CliFrontend.java:1101) > > > at org.apache.flink.client.CliFrontend$1.call( > > > CliFrontend.java:1098) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > > UserGroupInformation.java:1556) > > > at org.apache.flink.runtime.security.HadoopSecurityContext. > > > runSecured(HadoopSecurityContext.java:41) > > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java: > > 1098) > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > > > execution failed. > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > > mcV$sp(JobManager.scala:897) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > ger.scala:840) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > ger.scala:840) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > liftedTree1$1(Future.scala:24) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > > Future.scala:24) > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala: > 39) > > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > > ForkJoinTask.java:260) > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > runTask(ForkJoinPool.java:1339) > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > ForkJoinPool.java:1979) > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > ForkJoinWorkerThread.java:107) > > > Caused by: org.apache.flink.streaming.runtime.tasks. > StreamTaskException: > > > Cannot instantiate user function. > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > getStreamOperator(StreamConfig.java:235) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > createChainedOperator(OperatorChain.java:355) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > createOutputCollector(OperatorChain.java:282) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > > init>(OperatorChain.java:126) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > invoke(StreamTask.java:231) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > java:718) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > api.scala.DataStream$$anon$7 > > > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > > > ObjectStreamClass.java:2233) > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > ObjectStreamClass.java:1405) > > > at java.io.ObjectInputStream.defaultReadFields( > > > ObjectInputStream.java:2288) > > > at java.io.ObjectInputStream.readSerialData( > > > ObjectInputStream.java:2206) > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > ObjectInputStream.java:2064) > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > java:1568) > > > at java.io.ObjectInputStream.defaultReadFields( > > > ObjectInputStream.java:2282) > > > at java.io.ObjectInputStream.readSerialData( > > > ObjectInputStream.java:2206) > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > ObjectInputStream.java:2064) > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > java:1568) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > java:428) > > > at org.apache.flink.util.InstantiationUtil.deserializeObject( > > > InstantiationUtil.java:290) > > > at org.apache.flink.util.InstantiationUtil. > readObjectFromConfig( > > > InstantiationUtil.java:248) > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > getStreamOperator(StreamConfig.java:220) > > > > > > {code} > > > > > > However, replacing the function with this results in everything working > > as > > > expected: > > > > > > {code:java} > > > val stream = env.fromCollection(someArray).filter(new > > FilterFunction[Int] > > > { > > > override def filter(t: Int): Boolean = true > > > }) > > > {code} > > > > > > Perhaps something changed in the new build compared to the previous, as > > > this was working without issue before? > > > > > > > > > > > > -- > > > This message was sent by Atlassian JIRA > > > (v6.4.14#64029) > > > > > > > > > > > -- > > Shivam Sharma > > Data Engineer @ Goibibo > > Indian Institute Of Information Technology, Design and Manufacturing > > Jabalpur > > Mobile No- (+91) 8882114744 > > Email:- [hidden email] > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > <https://www.linkedin.com/in/28shivamsharma>* > > > |
@Shivam and @Ryan:
My first feeling would be the following: You have the Scala library in your user code, and thus through the reversed class loading, the scala function types get duplicated. The right way to fix that is to make sure you build a proper jar file without any provided dependencies. Make sure you set "-Pbuild-jar" when packaging your program. - You could also set "classloader.resolve-order: parent-first" in your configuration to restore the old class loading style. - We should add "scala" to the default value for "classloader.parent-first-patterns". You can add it yourself in the configuration (make sure you keep all existing parent-first-patterns as well). On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <[hidden email]> wrote: > Hi, > > I tried to reproduce the problem given your description Ryan. I submitted > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from > flink.apache.org and a cluster built from sources). However, I was not > able > to reproduce the problem. Therefore I suspect that it has something to do > with your or my setup. > > In order to further diagnose the problem, it would be tremendously helpful > if you could share the logs contained in the logs directory with us. > > Cheers, > Till > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <[hidden email]> wrote: > > > Hi! > > > > This may be due to the changed classloading semantics. > > > > Just to verify this, can you check if it gets solved by setting the > > following in the Flink configuration: "classloader.resolve-order: parent- > > first" > > > > By default, Flink 1.4 uses now inverted classloading to allow users to > use > > their own copies of dependencies, irrespective of what the underlying > > classpath is spoiled with. You can for example use a different Avro > > versions than Hadoop pull in, even without shading, or even different > Akka > > / Jackson / etc versions. > > > > That is a nice improvement, but it may have some impacts on tools that > have > > been build before. When you see classcast exceptions (like X cannot be > cast > > to X), that is probably caused by the fact that the classloader > duplicates > > a dependency from the JVM classpath in user-space, but objects/classes > move > > between the domains. > > > > Stephan > > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <[hidden email] > > > > wrote: > > > > > Same Issue I am facing :- > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > > > > > Can anyone explain the exception > > > > > > Thanks > > > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <[hidden email]> > > > wrote: > > > > > > > Ryan Brideau created FLINK-8256: > > > > ----------------------------------- > > > > > > > > Summary: Cannot use Scala functions to filter in 1.4 - > > > > java.lang.ClassCastException > > > > Key: FLINK-8256 > > > > URL: https://issues.apache.org/ > jira/browse/FLINK-8256 > > > > Project: Flink > > > > Issue Type: Bug > > > > Components: DataStream API > > > > Affects Versions: 1.4.0 > > > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > > > Reporter: Ryan Brideau > > > > > > > > > > > > I built the newest release locally today, but when I try to filter a > > > > stream using an anonymous or named function, I get an error. Here's a > > > > simple example: > > > > > > > > > > > > {code:java} > > > > import org.apache.flink.api.java.utils.ParameterTool > > > > import org.apache.flink.streaming.api.scala._ > > > > > > > > object TestFunction { > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > val params = ParameterTool.fromArgs(args) > > > > env.getConfig.setGlobalJobParameters(params) > > > > > > > > val someArray = Array(1,2,3) > > > > val stream = env.fromCollection(someArray).filter(_ => true) > > > > stream.print().setParallelism(1) > > > > env.execute("Testing Function") > > > > } > > > > } > > > > {code} > > > > > > > > This results in: > > > > > > > > > > > > {code:java} > > > > Job execution switched to status FAILING. > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > > > > instantiate user function. > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > getStreamOperator(StreamConfig.java:235) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > createChainedOperator(OperatorChain.java:355) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > createOutputCollector(OperatorChain.java:282) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > > > init>(OperatorChain.java:126) > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > invoke(StreamTask.java:231) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > java:718) > > > > at java.lang.Thread.run(Thread.java:748) > > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 > of > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > api.scala.DataStream$$anon$7 > > > > at java.io.ObjectStreamClass$FieldReflector. > setObjFieldValues( > > > > ObjectStreamClass.java:2233) > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > ObjectStreamClass.java:1405) > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > ObjectInputStream.java:2288) > > > > at java.io.ObjectInputStream.readSerialData( > > > > ObjectInputStream.java:2206) > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > ObjectInputStream.java:2064) > > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > > java:1568) > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > ObjectInputStream.java:2282) > > > > at java.io.ObjectInputStream.readSerialData( > > > > ObjectInputStream.java:2206) > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > ObjectInputStream.java:2064) > > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > > java:1568) > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > > java:428) > > > > at org.apache.flink.util.InstantiationUtil. > deserializeObject( > > > > InstantiationUtil.java:290) > > > > at org.apache.flink.util.InstantiationUtil. > > readObjectFromConfig( > > > > InstantiationUtil.java:248) > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > getStreamOperator(StreamConfig.java:220) > > > > ... 6 more > > > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > > > > > ------------------------------------------------------------ > > > > The program finished with the following exception: > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > > program > > > > execution failed: Job execution failed. > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > ClusterClient.java:492) > > > > at org.apache.flink.client.program.StandaloneClusterClient. > > > > submitJob(StandaloneClusterClient.java:105) > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > ClusterClient.java:456) > > > > at org.apache.flink.streaming.api.environment. > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > > > > at org.apache.flink.streaming.api.scala. > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment. > > scala:638) > > > > at org.peopleinmotion.TestFunction$.main( > > TestFunction.scala:20) > > > > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > > > NativeMethodAccessorImpl.java:62) > > > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > > DelegatingMethodAccessorImpl.java:43) > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > at org.apache.flink.client.program.PackagedProgram. > > callMainMeth > > > od( > > > > PackagedProgram.java:525) > > > > at org.apache.flink.client.program.PackagedProgram. > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > ClusterClient.java:396) > > > > at org.apache.flink.client.CliFrontend.executeProgram( > > > > CliFrontend.java:802) > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend. > > java:282) > > > > at org.apache.flink.client.CliFrontend.parseParameters( > > > > CliFrontend.java:1054) > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > CliFrontend.java:1101) > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > CliFrontend.java:1098) > > > > at java.security.AccessController.doPrivileged(Native > Method) > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > > > UserGroupInformation.java:1556) > > > > at org.apache.flink.runtime.security.HadoopSecurityContext. > > > > runSecured(HadoopSecurityContext.java:41) > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend. > java: > > > 1098) > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Job > > > > execution failed. > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > > > mcV$sp(JobManager.scala:897) > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > ger.scala:840) > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > ger.scala:840) > > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > > liftedTree1$1(Future.scala:24) > > > > at scala.concurrent.impl.Future$ > PromiseCompletingRunnable.run( > > > > Future.scala:24) > > > > at akka.dispatch.TaskInvocation. > run(AbstractDispatcher.scala: > > 39) > > > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > > > ForkJoinTask.java:260) > > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > > runTask(ForkJoinPool.java:1339) > > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > > ForkJoinPool.java:1979) > > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > > ForkJoinWorkerThread.java:107) > > > > Caused by: org.apache.flink.streaming.runtime.tasks. > > StreamTaskException: > > > > Cannot instantiate user function. > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > getStreamOperator(StreamConfig.java:235) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > createChainedOperator(OperatorChain.java:355) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > createOutputCollector(OperatorChain.java:282) > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > > > > init>(OperatorChain.java:126) > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > invoke(StreamTask.java:231) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > java:718) > > > > at java.lang.Thread.run(Thread.java:748) > > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 > of > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > api.scala.DataStream$$anon$7 > > > > at java.io.ObjectStreamClass$FieldReflector. > setObjFieldValues( > > > > ObjectStreamClass.java:2233) > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > ObjectStreamClass.java:1405) > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > ObjectInputStream.java:2288) > > > > at java.io.ObjectInputStream.readSerialData( > > > > ObjectInputStream.java:2206) > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > ObjectInputStream.java:2064) > > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > > java:1568) > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > ObjectInputStream.java:2282) > > > > at java.io.ObjectInputStream.readSerialData( > > > > ObjectInputStream.java:2206) > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > ObjectInputStream.java:2064) > > > > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > > > > java:1568) > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > > java:428) > > > > at org.apache.flink.util.InstantiationUtil. > deserializeObject( > > > > InstantiationUtil.java:290) > > > > at org.apache.flink.util.InstantiationUtil. > > readObjectFromConfig( > > > > InstantiationUtil.java:248) > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > getStreamOperator(StreamConfig.java:220) > > > > > > > > {code} > > > > > > > > However, replacing the function with this results in everything > working > > > as > > > > expected: > > > > > > > > {code:java} > > > > val stream = env.fromCollection(someArray).filter(new > > > FilterFunction[Int] > > > > { > > > > override def filter(t: Int): Boolean = true > > > > }) > > > > {code} > > > > > > > > Perhaps something changed in the new build compared to the previous, > as > > > > this was working without issue before? > > > > > > > > > > > > > > > > -- > > > > This message was sent by Atlassian JIRA > > > > (v6.4.14#64029) > > > > > > > > > > > > > > > > -- > > > Shivam Sharma > > > Data Engineer @ Goibibo > > > Indian Institute Of Information Technology, Design and Manufacturing > > > Jabalpur > > > Mobile No- (+91) 8882114744 > > > Email:- [hidden email] > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > > <https://www.linkedin.com/in/28shivamsharma>* > > > > > > |
Hi Stephan,
Thanks for your help. Basically reverted the classloading to parent first, *resolved this issue*. Thanks for this but I have one question: I am building a fat jar without any dependency as Provided. And in my case I am using proto-java version 3.4.0 but I think fink uses pretty old version(I think 2.5.0) and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case of parent-first classloading. Thanks On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <[hidden email]> wrote: > @Shivam and @Ryan: > > My first feeling would be the following: You have the Scala library in your > user code, and thus through the reversed class loading, the scala function > types get duplicated. > > The right way to fix that is to make sure you build a proper jar file > without any provided dependencies. Make sure you set "-Pbuild-jar" when > packaging your program. > > - You could also set "classloader.resolve-order: parent-first" in your > configuration to restore the old class loading style. > > - We should add "scala" to the default value for > "classloader.parent-first-patterns". You can add it yourself in the > configuration (make sure you keep all existing parent-first-patterns as > well). > > > > On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <[hidden email]> > wrote: > > > Hi, > > > > I tried to reproduce the problem given your description Ryan. I submitted > > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version > > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from > > flink.apache.org and a cluster built from sources). However, I was not > > able > > to reproduce the problem. Therefore I suspect that it has something to do > > with your or my setup. > > > > In order to further diagnose the problem, it would be tremendously > helpful > > if you could share the logs contained in the logs directory with us. > > > > Cheers, > > Till > > > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <[hidden email]> wrote: > > > > > Hi! > > > > > > This may be due to the changed classloading semantics. > > > > > > Just to verify this, can you check if it gets solved by setting the > > > following in the Flink configuration: "classloader.resolve-order: > parent- > > > first" > > > > > > By default, Flink 1.4 uses now inverted classloading to allow users to > > use > > > their own copies of dependencies, irrespective of what the underlying > > > classpath is spoiled with. You can for example use a different Avro > > > versions than Hadoop pull in, even without shading, or even different > > Akka > > > / Jackson / etc versions. > > > > > > That is a nice improvement, but it may have some impacts on tools that > > have > > > been build before. When you see classcast exceptions (like X cannot be > > cast > > > to X), that is probably caused by the fact that the classloader > > duplicates > > > a dependency from the JVM classpath in user-space, but objects/classes > > move > > > between the domains. > > > > > > Stephan > > > > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma < > [hidden email] > > > > > > wrote: > > > > > > > Same Issue I am facing :- > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > > > > > > > Can anyone explain the exception > > > > > > > > Thanks > > > > > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) < > [hidden email]> > > > > wrote: > > > > > > > > > Ryan Brideau created FLINK-8256: > > > > > ----------------------------------- > > > > > > > > > > Summary: Cannot use Scala functions to filter in 1.4 - > > > > > java.lang.ClassCastException > > > > > Key: FLINK-8256 > > > > > URL: https://issues.apache.org/ > > jira/browse/FLINK-8256 > > > > > Project: Flink > > > > > Issue Type: Bug > > > > > Components: DataStream API > > > > > Affects Versions: 1.4.0 > > > > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > > > > Reporter: Ryan Brideau > > > > > > > > > > > > > > > I built the newest release locally today, but when I try to filter > a > > > > > stream using an anonymous or named function, I get an error. > Here's a > > > > > simple example: > > > > > > > > > > > > > > > {code:java} > > > > > import org.apache.flink.api.java.utils.ParameterTool > > > > > import org.apache.flink.streaming.api.scala._ > > > > > > > > > > object TestFunction { > > > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > > val params = ParameterTool.fromArgs(args) > > > > > env.getConfig.setGlobalJobParameters(params) > > > > > > > > > > val someArray = Array(1,2,3) > > > > > val stream = env.fromCollection(someArray).filter(_ => true) > > > > > stream.print().setParallelism(1) > > > > > env.execute("Testing Function") > > > > > } > > > > > } > > > > > {code} > > > > > > > > > > This results in: > > > > > > > > > > > > > > > {code:java} > > > > > Job execution switched to status FAILING. > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot > > > > > instantiate user function. > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > getStreamOperator(StreamConfig.java:235) > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > > createChainedOperator(OperatorChain.java:355) > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > > createOutputCollector(OperatorChain.java:282) > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain.< > > > > > init>(OperatorChain.java:126) > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > invoke(StreamTask.java:231) > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > java:718) > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 > > of > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > api.scala.DataStream$$anon$7 > > > > > at java.io.ObjectStreamClass$FieldReflector. > > setObjFieldValues( > > > > > ObjectStreamClass.java:2233) > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > ObjectStreamClass.java:1405) > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > ObjectInputStream.java:2288) > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > ObjectInputStream.java:2206) > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > ObjectInputStream.java:2064) > > > > > at java.io.ObjectInputStream. > readObject0(ObjectInputStream. > > > > > java:1568) > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > ObjectInputStream.java:2282) > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > ObjectInputStream.java:2206) > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > ObjectInputStream.java:2064) > > > > > at java.io.ObjectInputStream. > readObject0(ObjectInputStream. > > > > > java:1568) > > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > > > java:428) > > > > > at org.apache.flink.util.InstantiationUtil. > > deserializeObject( > > > > > InstantiationUtil.java:290) > > > > > at org.apache.flink.util.InstantiationUtil. > > > readObjectFromConfig( > > > > > InstantiationUtil.java:248) > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > getStreamOperator(StreamConfig.java:220) > > > > > ... 6 more > > > > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > > > > > > > ------------------------------------------------------------ > > > > > The program finished with the following exception: > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > > > program > > > > > execution failed: Job execution failed. > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > ClusterClient.java:492) > > > > > at org.apache.flink.client.program. > StandaloneClusterClient. > > > > > submitJob(StandaloneClusterClient.java:105) > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > ClusterClient.java:456) > > > > > at org.apache.flink.streaming.api.environment. > > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > > > > > at org.apache.flink.streaming.api.scala. > > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment. > > > scala:638) > > > > > at org.peopleinmotion.TestFunction$.main( > > > TestFunction.scala:20) > > > > > at org.peopleinmotion.TestFunction.main( > TestFunction.scala) > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > > > > NativeMethodAccessorImpl.java:62) > > > > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > > > DelegatingMethodAccessorImpl.java:43) > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > at org.apache.flink.client.program.PackagedProgram. > > > callMainMeth > > > > od( > > > > > PackagedProgram.java:525) > > > > > at org.apache.flink.client.program.PackagedProgram. > > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > ClusterClient.java:396) > > > > > at org.apache.flink.client.CliFrontend.executeProgram( > > > > > CliFrontend.java:802) > > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend. > > > java:282) > > > > > at org.apache.flink.client.CliFrontend.parseParameters( > > > > > CliFrontend.java:1054) > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > CliFrontend.java:1101) > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > CliFrontend.java:1098) > > > > > at java.security.AccessController.doPrivileged(Native > > Method) > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > > > > UserGroupInformation.java:1556) > > > > > at org.apache.flink.runtime.security. > HadoopSecurityContext. > > > > > runSecured(HadoopSecurityContext.java:41) > > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend. > > java: > > > > 1098) > > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > > Job > > > > > execution failed. > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > > > > mcV$sp(JobManager.scala:897) > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > ger.scala:840) > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > ger.scala:840) > > > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > > > liftedTree1$1(Future.scala:24) > > > > > at scala.concurrent.impl.Future$ > > PromiseCompletingRunnable.run( > > > > > Future.scala:24) > > > > > at akka.dispatch.TaskInvocation. > > run(AbstractDispatcher.scala: > > > 39) > > > > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > > > > ForkJoinTask.java:260) > > > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > > > runTask(ForkJoinPool.java:1339) > > > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > > > ForkJoinPool.java:1979) > > > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > > > ForkJoinWorkerThread.java:107) > > > > > Caused by: org.apache.flink.streaming.runtime.tasks. > > > StreamTaskException: > > > > > Cannot instantiate user function. > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > getStreamOperator(StreamConfig.java:235) > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > > createChainedOperator(OperatorChain.java:355) > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > > > > > createOutputCollector(OperatorChain.java:282) > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain.< > > > > > init>(OperatorChain.java:126) > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > invoke(StreamTask.java:231) > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > java:718) > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > Caused by: java.lang.ClassCastException: cannot assign instance of > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 > > of > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > api.scala.DataStream$$anon$7 > > > > > at java.io.ObjectStreamClass$FieldReflector. > > setObjFieldValues( > > > > > ObjectStreamClass.java:2233) > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > ObjectStreamClass.java:1405) > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > ObjectInputStream.java:2288) > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > ObjectInputStream.java:2206) > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > ObjectInputStream.java:2064) > > > > > at java.io.ObjectInputStream. > readObject0(ObjectInputStream. > > > > > java:1568) > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > ObjectInputStream.java:2282) > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > ObjectInputStream.java:2206) > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > ObjectInputStream.java:2064) > > > > > at java.io.ObjectInputStream. > readObject0(ObjectInputStream. > > > > > java:1568) > > > > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > > > > > java:428) > > > > > at org.apache.flink.util.InstantiationUtil. > > deserializeObject( > > > > > InstantiationUtil.java:290) > > > > > at org.apache.flink.util.InstantiationUtil. > > > readObjectFromConfig( > > > > > InstantiationUtil.java:248) > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > getStreamOperator(StreamConfig.java:220) > > > > > > > > > > {code} > > > > > > > > > > However, replacing the function with this results in everything > > working > > > > as > > > > > expected: > > > > > > > > > > {code:java} > > > > > val stream = env.fromCollection(someArray).filter(new > > > > FilterFunction[Int] > > > > > { > > > > > override def filter(t: Int): Boolean = true > > > > > }) > > > > > {code} > > > > > > > > > > Perhaps something changed in the new build compared to the > previous, > > as > > > > > this was working without issue before? > > > > > > > > > > > > > > > > > > > > -- > > > > > This message was sent by Atlassian JIRA > > > > > (v6.4.14#64029) > > > > > > > > > > > > > > > > > > > > > -- > > > > Shivam Sharma > > > > Data Engineer @ Goibibo > > > > Indian Institute Of Information Technology, Design and Manufacturing > > > > Jabalpur > > > > Mobile No- (+91) 8882114744 > > > > Email:- [hidden email] > > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > > > <https://www.linkedin.com/in/28shivamsharma>* > > > > > > > > > > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- [hidden email] LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>* |
@Shivam I think Flink 1.4 should not expose any ProtoBuf dependency any
more. We shade it in Mesos and use a ProtoBuf free akka version now. On Thu, Dec 14, 2017 at 8:41 PM, Shivam Sharma <[hidden email]> wrote: > Hi Stephan, > > Thanks for your help. Basically reverted the classloading to parent > first, *resolved > this issue*. Thanks for this but I have one question: > > I am building a fat jar without any dependency as Provided. And in my case > I am using proto-java version 3.4.0 but I think fink uses pretty old > version(I think 2.5.0) > and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case > of parent-first classloading. > > Thanks > > On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <[hidden email]> wrote: > > > @Shivam and @Ryan: > > > > My first feeling would be the following: You have the Scala library in > your > > user code, and thus through the reversed class loading, the scala > function > > types get duplicated. > > > > The right way to fix that is to make sure you build a proper jar file > > without any provided dependencies. Make sure you set "-Pbuild-jar" when > > packaging your program. > > > > - You could also set "classloader.resolve-order: parent-first" in your > > configuration to restore the old class loading style. > > > > - We should add "scala" to the default value for > > "classloader.parent-first-patterns". You can add it yourself in the > > configuration (make sure you keep all existing parent-first-patterns as > > well). > > > > > > > > On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <[hidden email]> > > wrote: > > > > > Hi, > > > > > > I tried to reproduce the problem given your description Ryan. I > submitted > > > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version > > > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from > > > flink.apache.org and a cluster built from sources). However, I was not > > > able > > > to reproduce the problem. Therefore I suspect that it has something to > do > > > with your or my setup. > > > > > > In order to further diagnose the problem, it would be tremendously > > helpful > > > if you could share the logs contained in the logs directory with us. > > > > > > Cheers, > > > Till > > > > > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <[hidden email]> > wrote: > > > > > > > Hi! > > > > > > > > This may be due to the changed classloading semantics. > > > > > > > > Just to verify this, can you check if it gets solved by setting the > > > > following in the Flink configuration: "classloader.resolve-order: > > parent- > > > > first" > > > > > > > > By default, Flink 1.4 uses now inverted classloading to allow users > to > > > use > > > > their own copies of dependencies, irrespective of what the underlying > > > > classpath is spoiled with. You can for example use a different Avro > > > > versions than Hadoop pull in, even without shading, or even different > > > Akka > > > > / Jackson / etc versions. > > > > > > > > That is a nice improvement, but it may have some impacts on tools > that > > > have > > > > been build before. When you see classcast exceptions (like X cannot > be > > > cast > > > > to X), that is probably caused by the fact that the classloader > > > duplicates > > > > a dependency from the JVM classpath in user-space, but > objects/classes > > > move > > > > between the domains. > > > > > > > > Stephan > > > > > > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma < > > [hidden email] > > > > > > > > wrote: > > > > > > > > > Same Issue I am facing :- > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > > > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > > > > > > > > > Can anyone explain the exception > > > > > > > > > > Thanks > > > > > > > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) < > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Ryan Brideau created FLINK-8256: > > > > > > ----------------------------------- > > > > > > > > > > > > Summary: Cannot use Scala functions to filter in > 1.4 - > > > > > > java.lang.ClassCastException > > > > > > Key: FLINK-8256 > > > > > > URL: https://issues.apache.org/ > > > jira/browse/FLINK-8256 > > > > > > Project: Flink > > > > > > Issue Type: Bug > > > > > > Components: DataStream API > > > > > > Affects Versions: 1.4.0 > > > > > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > > > > > Reporter: Ryan Brideau > > > > > > > > > > > > > > > > > > I built the newest release locally today, but when I try to > filter > > a > > > > > > stream using an anonymous or named function, I get an error. > > Here's a > > > > > > simple example: > > > > > > > > > > > > > > > > > > {code:java} > > > > > > import org.apache.flink.api.java.utils.ParameterTool > > > > > > import org.apache.flink.streaming.api.scala._ > > > > > > > > > > > > object TestFunction { > > > > > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > > > val params = ParameterTool.fromArgs(args) > > > > > > env.getConfig.setGlobalJobParameters(params) > > > > > > > > > > > > val someArray = Array(1,2,3) > > > > > > val stream = env.fromCollection(someArray).filter(_ => true) > > > > > > stream.print().setParallelism(1) > > > > > > env.execute("Testing Function") > > > > > > } > > > > > > } > > > > > > {code} > > > > > > > > > > > > This results in: > > > > > > > > > > > > > > > > > > {code:java} > > > > > > Job execution switched to status FAILING. > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: > > Cannot > > > > > > instantiate user function. > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:235) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createChainedOperator(OperatorChain.java:355) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createOutputCollector(OperatorChain.java:282) > > > > > > at org.apache.flink.streaming. > > runtime.tasks.OperatorChain.< > > > > > > init>(OperatorChain.java:126) > > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > > invoke(StreamTask.java:231) > > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > > java:718) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: java.lang.ClassCastException: cannot assign instance > of > > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7. > cleanFun$6 > > > of > > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > > api.scala.DataStream$$anon$7 > > > > > > at java.io.ObjectStreamClass$FieldReflector. > > > setObjFieldValues( > > > > > > ObjectStreamClass.java:2233) > > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > > ObjectStreamClass.java:1405) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2288) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2282) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream. > readObject(ObjectInputStream. > > > > > > java:428) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > deserializeObject( > > > > > > InstantiationUtil.java:290) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > > readObjectFromConfig( > > > > > > InstantiationUtil.java:248) > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:220) > > > > > > ... 6 more > > > > > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > > > > > > > > > ------------------------------------------------------------ > > > > > > The program finished with the following exception: > > > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > > > > program > > > > > > execution failed: Job execution failed. > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:492) > > > > > > at org.apache.flink.client.program. > > StandaloneClusterClient. > > > > > > submitJob(StandaloneClusterClient.java:105) > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:456) > > > > > > at org.apache.flink.streaming.api.environment. > > > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java: > 66) > > > > > > at org.apache.flink.streaming.api.scala. > > > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment. > > > > scala:638) > > > > > > at org.peopleinmotion.TestFunction$.main( > > > > TestFunction.scala:20) > > > > > > at org.peopleinmotion.TestFunction.main( > > TestFunction.scala) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > > > Method) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > > > > > NativeMethodAccessorImpl.java:62) > > > > > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > > > > DelegatingMethodAccessorImpl.java:43) > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > at org.apache.flink.client.program.PackagedProgram. > > > > callMainMeth > > > > > od( > > > > > > PackagedProgram.java:525) > > > > > > at org.apache.flink.client.program.PackagedProgram. > > > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:396) > > > > > > at org.apache.flink.client.CliFrontend.executeProgram( > > > > > > CliFrontend.java:802) > > > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend. > > > > java:282) > > > > > > at org.apache.flink.client.CliFrontend.parseParameters( > > > > > > CliFrontend.java:1054) > > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > > CliFrontend.java:1101) > > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > > CliFrontend.java:1098) > > > > > > at java.security.AccessController.doPrivileged(Native > > > Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > > > > > UserGroupInformation.java:1556) > > > > > > at org.apache.flink.runtime.security. > > HadoopSecurityContext. > > > > > > runSecured(HadoopSecurityContext.java:41) > > > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend. > > > java: > > > > > 1098) > > > > > > Caused by: org.apache.flink.runtime. > client.JobExecutionException: > > > Job > > > > > > execution failed. > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > > > > > mcV$sp(JobManager.scala:897) > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > > ger.scala:840) > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > > ger.scala:840) > > > > > > at scala.concurrent.impl.Future$ > PromiseCompletingRunnable. > > > > > > liftedTree1$1(Future.scala:24) > > > > > > at scala.concurrent.impl.Future$ > > > PromiseCompletingRunnable.run( > > > > > > Future.scala:24) > > > > > > at akka.dispatch.TaskInvocation. > > > run(AbstractDispatcher.scala: > > > > 39) > > > > > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > > > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > > > > > ForkJoinTask.java:260) > > > > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > > > > runTask(ForkJoinPool.java:1339) > > > > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > > > > ForkJoinPool.java:1979) > > > > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > > > > ForkJoinWorkerThread.java:107) > > > > > > Caused by: org.apache.flink.streaming.runtime.tasks. > > > > StreamTaskException: > > > > > > Cannot instantiate user function. > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:235) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createChainedOperator(OperatorChain.java:355) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createOutputCollector(OperatorChain.java:282) > > > > > > at org.apache.flink.streaming. > > runtime.tasks.OperatorChain.< > > > > > > init>(OperatorChain.java:126) > > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > > invoke(StreamTask.java:231) > > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > > java:718) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: java.lang.ClassCastException: cannot assign instance > of > > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7. > cleanFun$6 > > > of > > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > > api.scala.DataStream$$anon$7 > > > > > > at java.io.ObjectStreamClass$FieldReflector. > > > setObjFieldValues( > > > > > > ObjectStreamClass.java:2233) > > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > > ObjectStreamClass.java:1405) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2288) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2282) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream. > readObject(ObjectInputStream. > > > > > > java:428) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > deserializeObject( > > > > > > InstantiationUtil.java:290) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > > readObjectFromConfig( > > > > > > InstantiationUtil.java:248) > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:220) > > > > > > > > > > > > {code} > > > > > > > > > > > > However, replacing the function with this results in everything > > > working > > > > > as > > > > > > expected: > > > > > > > > > > > > {code:java} > > > > > > val stream = env.fromCollection(someArray).filter(new > > > > > FilterFunction[Int] > > > > > > { > > > > > > override def filter(t: Int): Boolean = true > > > > > > }) > > > > > > {code} > > > > > > > > > > > > Perhaps something changed in the new build compared to the > > previous, > > > as > > > > > > this was working without issue before? > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > This message was sent by Atlassian JIRA > > > > > > (v6.4.14#64029) > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Shivam Sharma > > > > > Data Engineer @ Goibibo > > > > > Indian Institute Of Information Technology, Design and > Manufacturing > > > > > Jabalpur > > > > > Mobile No- (+91) 8882114744 > > > > > Email:- [hidden email] > > > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > > > > <https://www.linkedin.com/in/28shivamsharma>* > > > > > > > > > > > > > > > > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- [hidden email] > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* > |
Free forum by Nabble | Edit this page |