Hi!
I am trying to implement a scala shell for flink. I've started with a simple scala object who's main function will drop the user to the interactive scala shell (repl) at one point: import scala.tools.nsc.interpreter.ILoop import scala.tools.nsc.Settings object Job { def main(args: Array[String]) { val repl = new ILoop() repl.settings = new Settings() // enable this line to use scala in intellij repl.settings.usejavacp.value = true repl.createInterpreter() // start scala interpreter shell repl.process(repl.settings) repl.closeInterpreter() } } Now I am trying to execute the word count example as in: scala> import org.apache.flink.api.scala._ scala> val env = ExecutionEnvironment.getExecutionEnvironment scala> val text = env.fromElements("To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,") scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) scala> counts.print() scala> env.execute("Flink Scala Api Skeleton") However I am running into following error: env.execute("Flink Scala Api Skeleton") org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) at org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) at org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) at org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) ... 8 more Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) ... 13 more at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) at .<init>(<console>:12) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) at org.myorg.quickstart.Job$.main(Job.scala:37) at org.myorg.quickstart.Job.main(Job.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can point me in some direction? thanks, Nikolaas |
Hey Nikolaas,
Thank you for posting on the mailing list. I've met Nikolaas today in person and we were talking a bit about an interactive shell for Flink, potentially also an integration with Zeppelin. Great stuff I'm really looking forward to :) We were wondering if somebody from the list has some experience with the scala shell. I've pointed Nikolaas also to this PR: https://github.com/apache/flink/pull/35. Best, Robert On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email]> wrote: > Hi! > I am trying to implement a scala shell for flink. > > I've started with a simple scala object who's main function will drop the > user to the interactive scala shell (repl) at one point: > > > > > import scala.tools.nsc.interpreter.ILoop > import scala.tools.nsc.Settings > > object Job { > def main(args: Array[String]) { > > val repl = new ILoop() > repl.settings = new Settings() > > // enable this line to use scala in intellij > repl.settings.usejavacp.value = true > > repl.createInterpreter() > > // start scala interpreter shell > repl.process(repl.settings) > > repl.closeInterpreter() > } > } > > > > > Now I am trying to execute the word count example as in: > > > > > scala> import org.apache.flink.api.scala._ > > scala> val env = ExecutionEnvironment.getExecutionEnvironment > > scala> val text = env.fromElements("To be, or not to be,--that is the > question:--","Whether 'tis nobler in the mind to suffer", "The slings and > arrows of outrageous fortune","Or to take arms against a sea of troubles,") > > scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, > 1) }.groupBy(0).sum(1) > > scala> counts.print() > > scala> env.execute("Flink Scala Api Skeleton") > > > > > > > However I am running into following error: > > env.execute("Flink Scala Api Skeleton") > org.apache.flink.runtime.client.JobExecutionException: > java.lang.RuntimeException: The initialization of the DataSource's outputs > caused an error: The type serializer factory could not load its parameters > from the configuration due to missing classes. > at > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > at > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > Caused by: java.lang.RuntimeException: The type serializer factory could > not load its parameters from the configuration due to missing classes. > at > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > at > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > at > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > at > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > at > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > at > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > ... 8 more > Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:274) > at > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > at > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > at > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > at > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > ... 13 more > > at > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > at > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > at > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > at .<init>(<console>:12) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > at > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > at > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > at > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > at > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > at org.myorg.quickstart.Job$.main(Job.scala:37) > at org.myorg.quickstart.Job.main(Job.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > > > I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can > point me in some direction? > > thanks, > Nikolaas > |
I will look into it once I have some time (end of this week, or next
week probably) On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]> wrote: > Hey Nikolaas, > > Thank you for posting on the mailing list. I've met Nikolaas today in > person and we were talking a bit about an interactive shell for Flink, > potentially also an integration with Zeppelin. > > Great stuff I'm really looking forward to :) > > We were wondering if somebody from the list has some experience with the > scala shell. > I've pointed Nikolaas also to this PR: > https://github.com/apache/flink/pull/35. > > Best, > Robert > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email]> > wrote: > >> Hi! >> I am trying to implement a scala shell for flink. >> >> I've started with a simple scala object who's main function will drop the >> user to the interactive scala shell (repl) at one point: >> >> >> >> >> import scala.tools.nsc.interpreter.ILoop >> import scala.tools.nsc.Settings >> >> object Job { >> def main(args: Array[String]) { >> >> val repl = new ILoop() >> repl.settings = new Settings() >> >> // enable this line to use scala in intellij >> repl.settings.usejavacp.value = true >> >> repl.createInterpreter() >> >> // start scala interpreter shell >> repl.process(repl.settings) >> >> repl.closeInterpreter() >> } >> } >> >> >> >> >> Now I am trying to execute the word count example as in: >> >> >> >> >> scala> import org.apache.flink.api.scala._ >> >> scala> val env = ExecutionEnvironment.getExecutionEnvironment >> >> scala> val text = env.fromElements("To be, or not to be,--that is the >> question:--","Whether 'tis nobler in the mind to suffer", "The slings and >> arrows of outrageous fortune","Or to take arms against a sea of troubles,") >> >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, >> 1) }.groupBy(0).sum(1) >> >> scala> counts.print() >> >> scala> env.execute("Flink Scala Api Skeleton") >> >> >> >> >> >> >> However I am running into following error: >> >> env.execute("Flink Scala Api Skeleton") >> org.apache.flink.runtime.client.JobExecutionException: >> java.lang.RuntimeException: The initialization of the DataSource's outputs >> caused an error: The type serializer factory could not load its parameters >> from the configuration due to missing classes. >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) >> at >> >> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) >> at >> >> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) >> Caused by: java.lang.RuntimeException: The type serializer factory could >> not load its parameters from the configuration due to missing classes. >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) >> at >> >> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) >> at >> >> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) >> ... 8 more >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:274) >> at >> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) >> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) >> at >> >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) >> at >> >> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) >> ... 13 more >> >> at >> >> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) >> at >> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) >> at >> >> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) >> at >> >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) >> at .<init>(<console>:12) >> at .<clinit>(<console>) >> at .<init>(<console>:7) >> at .<clinit>(<console>) >> at $print(<console>) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) >> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) >> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) >> at >> >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) >> at >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >> at >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >> at >> >> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) >> at org.myorg.quickstart.Job$.main(Job.scala:37) >> at org.myorg.quickstart.Job.main(Job.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) >> >> >> >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can >> point me in some direction? >> >> thanks, >> Nikolaas >> |
To give a bit of context for the exception:
To execute a program, the classes of the user functions need to be available the executing TaskManagers. - If you execute locally from the IDE, all classes are in the classpath anyways. - If you use the remote environment, you need to attach the jar file to environment. - In your case (repl), you need to make sure that the generated classes are given to the TaskManager. In that sense, the approach is probably similar to the case of executing with a remote environment - only that you do not have a jar file up front, but need to generate it on the fly. As Robert mentioned, https://github.com/apache/flink/pull/35 may have a first solution to that. Other approaches are also possible, like simply always bundling all classes in the directory where the repl puts its generated classes. Greetings, Stephan On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]> wrote: > I will look into it once I have some time (end of this week, or next > week probably) > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]> > wrote: > > Hey Nikolaas, > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > person and we were talking a bit about an interactive shell for Flink, > > potentially also an integration with Zeppelin. > > > > Great stuff I'm really looking forward to :) > > > > We were wondering if somebody from the list has some experience with the > > scala shell. > > I've pointed Nikolaas also to this PR: > > https://github.com/apache/flink/pull/35. > > > > Best, > > Robert > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email] > > > > wrote: > > > >> Hi! > >> I am trying to implement a scala shell for flink. > >> > >> I've started with a simple scala object who's main function will drop > the > >> user to the interactive scala shell (repl) at one point: > >> > >> > >> > >> > >> import scala.tools.nsc.interpreter.ILoop > >> import scala.tools.nsc.Settings > >> > >> object Job { > >> def main(args: Array[String]) { > >> > >> val repl = new ILoop() > >> repl.settings = new Settings() > >> > >> // enable this line to use scala in intellij > >> repl.settings.usejavacp.value = true > >> > >> repl.createInterpreter() > >> > >> // start scala interpreter shell > >> repl.process(repl.settings) > >> > >> repl.closeInterpreter() > >> } > >> } > >> > >> > >> > >> > >> Now I am trying to execute the word count example as in: > >> > >> > >> > >> > >> scala> import org.apache.flink.api.scala._ > >> > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > >> > >> scala> val text = env.fromElements("To be, or not to be,--that is the > >> question:--","Whether 'tis nobler in the mind to suffer", "The slings > and > >> arrows of outrageous fortune","Or to take arms against a sea of > troubles,") > >> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { > (_, > >> 1) }.groupBy(0).sum(1) > >> > >> scala> counts.print() > >> > >> scala> env.execute("Flink Scala Api Skeleton") > >> > >> > >> > >> > >> > >> > >> However I am running into following error: > >> > >> env.execute("Flink Scala Api Skeleton") > >> org.apache.flink.runtime.client.JobExecutionException: > >> java.lang.RuntimeException: The initialization of the DataSource's > outputs > >> caused an error: The type serializer factory could not load its > parameters > >> from the configuration due to missing classes. > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > >> at > >> > >> > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > >> at > >> > >> > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > >> Caused by: java.lang.RuntimeException: The type serializer factory could > >> not load its parameters from the configuration due to missing classes. > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > >> at > >> > >> > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > >> at > >> > >> > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > >> at > >> > >> > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > >> ... 8 more > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > >> at java.security.AccessController.doPrivileged(Native Method) > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > >> at java.lang.Class.forName0(Native Method) > >> at java.lang.Class.forName(Class.java:274) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > >> at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > >> at > >> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > >> at > >> > >> > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > >> at > >> > >> > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > >> at > >> > >> > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > >> ... 13 more > >> > >> at > >> > >> > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > >> at > >> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > >> at > >> > >> > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > >> at > >> > >> > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > >> at .<init>(<console>:12) > >> at .<clinit>(<console>) > >> at .<init>(<console>:7) > >> at .<clinit>(<console>) > >> at $print(<console>) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > >> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > >> at > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > >> at > >> > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > >> at > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > >> at > >> > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > >> at > >> > >> > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > >> at org.myorg.quickstart.Job.main(Job.scala) > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >> at > >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >> at > >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> at java.lang.reflect.Method.invoke(Method.java:606) > >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > >> > >> > >> > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or > can > >> point me in some direction? > >> > >> thanks, > >> Nikolaas > >> > |
Thanks for the feedback guys!
Apparently The Scala Shell compiles the Shell input to some kind of virtual directory. It should be possible to create a jar from it's content and then hand it over to Flink for execution in some way. I will further investigate.. cheers, Nikolaas 2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>: > To give a bit of context for the exception: > > To execute a program, the classes of the user functions need to be > available the executing TaskManagers. > > - If you execute locally from the IDE, all classes are in the classpath > anyways. > - If you use the remote environment, you need to attach the jar file to > environment. > > - In your case (repl), you need to make sure that the generated classes > are given to the TaskManager. In that sense, the approach is probably > similar to the case of executing with a remote environment - only that you > do not have a jar file up front, but need to generate it on the fly. As > Robert mentioned, https://github.com/apache/flink/pull/35 may have a first > solution to that. Other approaches are also possible, like simply always > bundling all classes in the directory where the repl puts its generated > classes. > > Greetings, > Stephan > > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > I will look into it once I have some time (end of this week, or next > > week probably) > > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]> > > wrote: > > > Hey Nikolaas, > > > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > > person and we were talking a bit about an interactive shell for Flink, > > > potentially also an integration with Zeppelin. > > > > > > Great stuff I'm really looking forward to :) > > > > > > We were wondering if somebody from the list has some experience with > the > > > scala shell. > > > I've pointed Nikolaas also to this PR: > > > https://github.com/apache/flink/pull/35. > > > > > > Best, > > > Robert > > > > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik < > [hidden email] > > > > > > wrote: > > > > > >> Hi! > > >> I am trying to implement a scala shell for flink. > > >> > > >> I've started with a simple scala object who's main function will drop > > the > > >> user to the interactive scala shell (repl) at one point: > > >> > > >> > > >> > > >> > > >> import scala.tools.nsc.interpreter.ILoop > > >> import scala.tools.nsc.Settings > > >> > > >> object Job { > > >> def main(args: Array[String]) { > > >> > > >> val repl = new ILoop() > > >> repl.settings = new Settings() > > >> > > >> // enable this line to use scala in intellij > > >> repl.settings.usejavacp.value = true > > >> > > >> repl.createInterpreter() > > >> > > >> // start scala interpreter shell > > >> repl.process(repl.settings) > > >> > > >> repl.closeInterpreter() > > >> } > > >> } > > >> > > >> > > >> > > >> > > >> Now I am trying to execute the word count example as in: > > >> > > >> > > >> > > >> > > >> scala> import org.apache.flink.api.scala._ > > >> > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > > >> > > >> scala> val text = env.fromElements("To be, or not to be,--that is the > > >> question:--","Whether 'tis nobler in the mind to suffer", "The slings > > and > > >> arrows of outrageous fortune","Or to take arms against a sea of > > troubles,") > > >> > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { > > (_, > > >> 1) }.groupBy(0).sum(1) > > >> > > >> scala> counts.print() > > >> > > >> scala> env.execute("Flink Scala Api Skeleton") > > >> > > >> > > >> > > >> > > >> > > >> > > >> However I am running into following error: > > >> > > >> env.execute("Flink Scala Api Skeleton") > > >> org.apache.flink.runtime.client.JobExecutionException: > > >> java.lang.RuntimeException: The initialization of the DataSource's > > outputs > > >> caused an error: The type serializer factory could not load its > > parameters > > >> from the configuration due to missing classes. > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > > >> at > > >> > > >> > > > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > > >> at > > >> > > >> > > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > > >> Caused by: java.lang.RuntimeException: The type serializer factory > could > > >> not load its parameters from the configuration due to missing classes. > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > > >> ... 8 more > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > >> at java.security.AccessController.doPrivileged(Native Method) > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > >> at java.lang.Class.forName0(Native Method) > > >> at java.lang.Class.forName(Class.java:274) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > > >> at > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > > >> at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > >> at > > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > > >> at > > >> > > >> > > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > > >> ... 13 more > > >> > > >> at > > >> > > >> > > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > > >> at > > >> > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > > >> at > > >> > > >> > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > > >> at > > >> > > >> > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > > >> at .<init>(<console>:12) > > >> at .<clinit>(<console>) > > >> at .<init>(<console>:7) > > >> at .<clinit>(<console>) > > >> at $print(<console>) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > > >> at > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > > >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > > >> at > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > > >> at > > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > > >> at > > >> > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > > >> at > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > >> at > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > >> at > > >> > > >> > > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > > >> at org.myorg.quickstart.Job.main(Job.scala) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > >> > > >> > > >> > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion > or > > can > > >> point me in some direction? > > >> > > >> thanks, > > >> Nikolaas > > >> > > > |
Great, let us know if you run into any issues.
Can you create a JIRA on the REPL and link to your repository for the community to track the status? On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s <[hidden email]> wrote: > Thanks for the feedback guys! > Apparently The Scala Shell compiles the Shell input to some kind of virtual > directory. > It should be possible to create a jar from it's content and then hand it > over to Flink for execution in some way. > I will further investigate.. > > cheers, > Nikolaas > > 2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>: > > > To give a bit of context for the exception: > > > > To execute a program, the classes of the user functions need to be > > available the executing TaskManagers. > > > > - If you execute locally from the IDE, all classes are in the classpath > > anyways. > > - If you use the remote environment, you need to attach the jar file to > > environment. > > > > - In your case (repl), you need to make sure that the generated classes > > are given to the TaskManager. In that sense, the approach is probably > > similar to the case of executing with a remote environment - only that > you > > do not have a jar file up front, but need to generate it on the fly. As > > Robert mentioned, https://github.com/apache/flink/pull/35 may have a > first > > solution to that. Other approaches are also possible, like simply always > > bundling all classes in the directory where the repl puts its generated > > classes. > > > > Greetings, > > Stephan > > > > > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > I will look into it once I have some time (end of this week, or next > > > week probably) > > > > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]> > > > wrote: > > > > Hey Nikolaas, > > > > > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > > > person and we were talking a bit about an interactive shell for > Flink, > > > > potentially also an integration with Zeppelin. > > > > > > > > Great stuff I'm really looking forward to :) > > > > > > > > We were wondering if somebody from the list has some experience with > > the > > > > scala shell. > > > > I've pointed Nikolaas also to this PR: > > > > https://github.com/apache/flink/pull/35. > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik < > > [hidden email] > > > > > > > > wrote: > > > > > > > >> Hi! > > > >> I am trying to implement a scala shell for flink. > > > >> > > > >> I've started with a simple scala object who's main function will > drop > > > the > > > >> user to the interactive scala shell (repl) at one point: > > > >> > > > >> > > > >> > > > >> > > > >> import scala.tools.nsc.interpreter.ILoop > > > >> import scala.tools.nsc.Settings > > > >> > > > >> object Job { > > > >> def main(args: Array[String]) { > > > >> > > > >> val repl = new ILoop() > > > >> repl.settings = new Settings() > > > >> > > > >> // enable this line to use scala in intellij > > > >> repl.settings.usejavacp.value = true > > > >> > > > >> repl.createInterpreter() > > > >> > > > >> // start scala interpreter shell > > > >> repl.process(repl.settings) > > > >> > > > >> repl.closeInterpreter() > > > >> } > > > >> } > > > >> > > > >> > > > >> > > > >> > > > >> Now I am trying to execute the word count example as in: > > > >> > > > >> > > > >> > > > >> > > > >> scala> import org.apache.flink.api.scala._ > > > >> > > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > > > >> > > > >> scala> val text = env.fromElements("To be, or not to be,--that is > the > > > >> question:--","Whether 'tis nobler in the mind to suffer", "The > slings > > > and > > > >> arrows of outrageous fortune","Or to take arms against a sea of > > > troubles,") > > > >> > > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") > }.map { > > > (_, > > > >> 1) }.groupBy(0).sum(1) > > > >> > > > >> scala> counts.print() > > > >> > > > >> scala> env.execute("Flink Scala Api Skeleton") > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> However I am running into following error: > > > >> > > > >> env.execute("Flink Scala Api Skeleton") > > > >> org.apache.flink.runtime.client.JobExecutionException: > > > >> java.lang.RuntimeException: The initialization of the DataSource's > > > outputs > > > >> caused an error: The type serializer factory could not load its > > > parameters > > > >> from the configuration due to missing classes. > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > > > >> Caused by: java.lang.RuntimeException: The type serializer factory > > could > > > >> not load its parameters from the configuration due to missing > classes. > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > > > >> ... 8 more > > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > >> at java.security.AccessController.doPrivileged(Native Method) > > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > > >> at java.lang.Class.forName0(Native Method) > > > >> at java.lang.Class.forName(Class.java:274) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > > > >> at > > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > > > >> at > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > > >> at > > > >> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > > >> at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > > > >> ... 13 more > > > >> > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > > > >> at > > > >> > > > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > > > >> at .<init>(<console>:12) > > > >> at .<clinit>(<console>) > > > >> at .<init>(<console>:7) > > > >> at .<clinit>(<console>) > > > >> at $print(<console>) > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at > > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > > > >> at > > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > > > >> at > scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > > > >> at > > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > > > >> at > > > > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > > > >> at > > > >> > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > > > >> at > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > >> at > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > >> at > > > >> > > > >> > > > > > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > > > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > > > >> at org.myorg.quickstart.Job.main(Job.scala) > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at > > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > > >> > > > >> > > > >> > > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion > > or > > > can > > > >> point me in some direction? > > > >> > > > >> thanks, > > > >> Nikolaas > > > >> > > > > > > |
I would also keep an eye on this issue from the Zeppelin project:
https://issues.apache.org/jira/browse/ZEPPELIN-44 The needed infrastructure is going to be very similar On Thu, Apr 16, 2015 at 10:15 AM, Kostas Tzoumas <[hidden email]> wrote: > Great, let us know if you run into any issues. > > Can you create a JIRA on the REPL and link to your repository for the > community to track the status? > > On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s < > [hidden email]> > wrote: > > > Thanks for the feedback guys! > > Apparently The Scala Shell compiles the Shell input to some kind of > virtual > > directory. > > It should be possible to create a jar from it's content and then hand it > > over to Flink for execution in some way. > > I will further investigate.. > > > > cheers, > > Nikolaas > > > > 2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>: > > > > > To give a bit of context for the exception: > > > > > > To execute a program, the classes of the user functions need to be > > > available the executing TaskManagers. > > > > > > - If you execute locally from the IDE, all classes are in the > classpath > > > anyways. > > > - If you use the remote environment, you need to attach the jar file > to > > > environment. > > > > > > - In your case (repl), you need to make sure that the generated > classes > > > are given to the TaskManager. In that sense, the approach is probably > > > similar to the case of executing with a remote environment - only that > > you > > > do not have a jar file up front, but need to generate it on the fly. As > > > Robert mentioned, https://github.com/apache/flink/pull/35 may have a > > first > > > solution to that. Other approaches are also possible, like simply > always > > > bundling all classes in the directory where the repl puts its generated > > > classes. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek < > [hidden email]> > > > wrote: > > > > > > > I will look into it once I have some time (end of this week, or next > > > > week probably) > > > > > > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email] > > > > > > wrote: > > > > > Hey Nikolaas, > > > > > > > > > > Thank you for posting on the mailing list. I've met Nikolaas today > in > > > > > person and we were talking a bit about an interactive shell for > > Flink, > > > > > potentially also an integration with Zeppelin. > > > > > > > > > > Great stuff I'm really looking forward to :) > > > > > > > > > > We were wondering if somebody from the list has some experience > with > > > the > > > > > scala shell. > > > > > I've pointed Nikolaas also to this PR: > > > > > https://github.com/apache/flink/pull/35. > > > > > > > > > > Best, > > > > > Robert > > > > > > > > > > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik < > > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > >> Hi! > > > > >> I am trying to implement a scala shell for flink. > > > > >> > > > > >> I've started with a simple scala object who's main function will > > drop > > > > the > > > > >> user to the interactive scala shell (repl) at one point: > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> import scala.tools.nsc.interpreter.ILoop > > > > >> import scala.tools.nsc.Settings > > > > >> > > > > >> object Job { > > > > >> def main(args: Array[String]) { > > > > >> > > > > >> val repl = new ILoop() > > > > >> repl.settings = new Settings() > > > > >> > > > > >> // enable this line to use scala in intellij > > > > >> repl.settings.usejavacp.value = true > > > > >> > > > > >> repl.createInterpreter() > > > > >> > > > > >> // start scala interpreter shell > > > > >> repl.process(repl.settings) > > > > >> > > > > >> repl.closeInterpreter() > > > > >> } > > > > >> } > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> Now I am trying to execute the word count example as in: > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> scala> import org.apache.flink.api.scala._ > > > > >> > > > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > > > > >> > > > > >> scala> val text = env.fromElements("To be, or not to be,--that is > > the > > > > >> question:--","Whether 'tis nobler in the mind to suffer", "The > > slings > > > > and > > > > >> arrows of outrageous fortune","Or to take arms against a sea of > > > > troubles,") > > > > >> > > > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") > > }.map { > > > > (_, > > > > >> 1) }.groupBy(0).sum(1) > > > > >> > > > > >> scala> counts.print() > > > > >> > > > > >> scala> env.execute("Flink Scala Api Skeleton") > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> However I am running into following error: > > > > >> > > > > >> env.execute("Flink Scala Api Skeleton") > > > > >> org.apache.flink.runtime.client.JobExecutionException: > > > > >> java.lang.RuntimeException: The initialization of the DataSource's > > > > outputs > > > > >> caused an error: The type serializer factory could not load its > > > > parameters > > > > >> from the configuration due to missing classes. > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > > > > >> at > org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > > > > >> Caused by: java.lang.RuntimeException: The type serializer factory > > > could > > > > >> not load its parameters from the configuration due to missing > > classes. > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > > > > >> ... 8 more > > > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > > >> at java.security.AccessController.doPrivileged(Native Method) > > > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > > > >> at java.lang.Class.forName0(Native Method) > > > > >> at java.lang.Class.forName(Class.java:274) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > > > > >> at > > > > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > > > > >> at > > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > > > >> at > > > > >> > > > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > > > >> at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > >> at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > > > > >> ... 13 more > > > > >> > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > > > > >> at > > > > >> > > > > > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > > > > >> at .<init>(<console>:12) > > > > >> at .<clinit>(<console>) > > > > >> at .<init>(<console>:7) > > > > >> at .<clinit>(<console>) > > > > >> at $print(<console>) > > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > > >> at > > > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > > > > >> at > > > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > > > > >> at > > scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > > > > >> at > > > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > > > > >> at > > > > > > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > > > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > > > > >> at > scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > > > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > > > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > > > > >> at > > > > >> > > > > > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > > >> at > > > > >> > > > > > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > > > > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > > > > >> at org.myorg.quickstart.Job.main(Job.scala) > > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > > >> at > > > > >> > > > > >> > > > > > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > > >> at > > > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > > > >> > > > > >> > > > > >> > > > > >> I'm pretty new to Scala and Flink, so maybe someone has a > suggestion > > > or > > > > can > > > > >> point me in some direction? > > > > >> > > > > >> thanks, > > > > >> Nikolaas > > > > >> > > > > > > > > > > |
Free forum by Nabble | Edit this page |