Flink interactive Scala shell

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink interactive Scala shell

nse sik
Hi!
I am trying to implement a scala shell for flink.

I've started with a simple scala object who's main function will drop the
user to the interactive scala shell (repl) at one point:




import scala.tools.nsc.interpreter.ILoop
import scala.tools.nsc.Settings

object Job {
  def main(args: Array[String]) {

    val repl = new ILoop()
    repl.settings = new Settings()

    // enable this line to use scala in intellij
    repl.settings.usejavacp.value = true

    repl.createInterpreter()

    // start scala interpreter shell
    repl.process(repl.settings)

    repl.closeInterpreter()
    }
  }




Now I am trying to execute the word count example as in:




scala> import org.apache.flink.api.scala._

scala> val env = ExecutionEnvironment.getExecutionEnvironment

scala> val text = env.fromElements("To be, or not to be,--that is the
question:--","Whether 'tis nobler in the mind to suffer", "The slings and
arrows of outrageous fortune","Or to take arms against a sea of troubles,")

scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_,
1) }.groupBy(0).sum(1)

scala> counts.print()

scala> env.execute("Flink Scala Api Skeleton")






However I am running into following error:

env.execute("Flink Scala Api Skeleton")
org.apache.flink.runtime.client.JobExecutionException:
java.lang.RuntimeException: The initialization of the DataSource's outputs
caused an error: The type serializer factory could not load its parameters
from the configuration due to missing classes.
at
org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
Caused by: java.lang.RuntimeException: The type serializer factory could
not load its parameters from the configuration due to missing classes.
at
org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
at
org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
at
org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
at
org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
at
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
at
org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
... 8 more
Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
at
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
at
org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
... 13 more

at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
at .<init>(<console>:12)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
at org.myorg.quickstart.Job$.main(Job.scala:37)
at org.myorg.quickstart.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)



I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can
point me in some direction?

thanks,
Nikolaas
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

Robert Metzger
Hey Nikolaas,

Thank you for posting on the mailing list. I've met Nikolaas today in
person and we were talking a bit about an interactive shell for Flink,
potentially also an integration with Zeppelin.

Great stuff I'm really looking forward to :)

We were wondering if somebody from the list has some experience with the
scala shell.
I've pointed Nikolaas also to this PR:
https://github.com/apache/flink/pull/35.

Best,
Robert


On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email]>
wrote:

> Hi!
> I am trying to implement a scala shell for flink.
>
> I've started with a simple scala object who's main function will drop the
> user to the interactive scala shell (repl) at one point:
>
>
>
>
> import scala.tools.nsc.interpreter.ILoop
> import scala.tools.nsc.Settings
>
> object Job {
>   def main(args: Array[String]) {
>
>     val repl = new ILoop()
>     repl.settings = new Settings()
>
>     // enable this line to use scala in intellij
>     repl.settings.usejavacp.value = true
>
>     repl.createInterpreter()
>
>     // start scala interpreter shell
>     repl.process(repl.settings)
>
>     repl.closeInterpreter()
>     }
>   }
>
>
>
>
> Now I am trying to execute the word count example as in:
>
>
>
>
> scala> import org.apache.flink.api.scala._
>
> scala> val env = ExecutionEnvironment.getExecutionEnvironment
>
> scala> val text = env.fromElements("To be, or not to be,--that is the
> question:--","Whether 'tis nobler in the mind to suffer", "The slings and
> arrows of outrageous fortune","Or to take arms against a sea of troubles,")
>
> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_,
> 1) }.groupBy(0).sum(1)
>
> scala> counts.print()
>
> scala> env.execute("Flink Scala Api Skeleton")
>
>
>
>
>
>
> However I am running into following error:
>
> env.execute("Flink Scala Api Skeleton")
> org.apache.flink.runtime.client.JobExecutionException:
> java.lang.RuntimeException: The initialization of the DataSource's outputs
> caused an error: The type serializer factory could not load its parameters
> from the configuration due to missing classes.
> at
>
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> at
>
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> Caused by: java.lang.RuntimeException: The type serializer factory could
> not load its parameters from the configuration due to missing classes.
> at
>
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> at
>
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> at
>
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> at
>
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> ... 8 more
> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:274)
> at
>
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> at
>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> at
>
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> at
>
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> ... 13 more
>
> at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> at
>
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> at .<init>(<console>:12)
> at .<clinit>(<console>)
> at .<init>(<console>:7)
> at .<clinit>(<console>)
> at $print(<console>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at org.myorg.quickstart.Job$.main(Job.scala:37)
> at org.myorg.quickstart.Job.main(Job.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
>
>
>
> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can
> point me in some direction?
>
> thanks,
> Nikolaas
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

Aljoscha Krettek-2
I will look into it once I have some time (end of this week, or next
week probably)

On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]> wrote:

> Hey Nikolaas,
>
> Thank you for posting on the mailing list. I've met Nikolaas today in
> person and we were talking a bit about an interactive shell for Flink,
> potentially also an integration with Zeppelin.
>
> Great stuff I'm really looking forward to :)
>
> We were wondering if somebody from the list has some experience with the
> scala shell.
> I've pointed Nikolaas also to this PR:
> https://github.com/apache/flink/pull/35.
>
> Best,
> Robert
>
>
> On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email]>
> wrote:
>
>> Hi!
>> I am trying to implement a scala shell for flink.
>>
>> I've started with a simple scala object who's main function will drop the
>> user to the interactive scala shell (repl) at one point:
>>
>>
>>
>>
>> import scala.tools.nsc.interpreter.ILoop
>> import scala.tools.nsc.Settings
>>
>> object Job {
>>   def main(args: Array[String]) {
>>
>>     val repl = new ILoop()
>>     repl.settings = new Settings()
>>
>>     // enable this line to use scala in intellij
>>     repl.settings.usejavacp.value = true
>>
>>     repl.createInterpreter()
>>
>>     // start scala interpreter shell
>>     repl.process(repl.settings)
>>
>>     repl.closeInterpreter()
>>     }
>>   }
>>
>>
>>
>>
>> Now I am trying to execute the word count example as in:
>>
>>
>>
>>
>> scala> import org.apache.flink.api.scala._
>>
>> scala> val env = ExecutionEnvironment.getExecutionEnvironment
>>
>> scala> val text = env.fromElements("To be, or not to be,--that is the
>> question:--","Whether 'tis nobler in the mind to suffer", "The slings and
>> arrows of outrageous fortune","Or to take arms against a sea of troubles,")
>>
>> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_,
>> 1) }.groupBy(0).sum(1)
>>
>> scala> counts.print()
>>
>> scala> env.execute("Flink Scala Api Skeleton")
>>
>>
>>
>>
>>
>>
>> However I am running into following error:
>>
>> env.execute("Flink Scala Api Skeleton")
>> org.apache.flink.runtime.client.JobExecutionException:
>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>> caused an error: The type serializer factory could not load its parameters
>> from the configuration due to missing classes.
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
>> at
>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
>> at
>>
>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
>> Caused by: java.lang.RuntimeException: The type serializer factory could
>> not load its parameters from the configuration due to missing classes.
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
>> ... 8 more
>> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:274)
>> at
>>
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>>
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
>> at
>>
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
>> ... 13 more
>>
>> at
>>
>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
>> at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
>> at
>>
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
>> at
>>
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
>> at .<init>(<console>:12)
>> at .<clinit>(<console>)
>> at .<init>(<console>:7)
>> at .<clinit>(<console>)
>> at $print(<console>)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>> at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>> at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>>
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>> at org.myorg.quickstart.Job$.main(Job.scala:37)
>> at org.myorg.quickstart.Job.main(Job.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
>>
>>
>>
>> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can
>> point me in some direction?
>>
>> thanks,
>> Nikolaas
>>
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

Stephan Ewen
To give a bit of context for the exception:

To execute a program, the classes of the user functions need to be
available the executing TaskManagers.

 - If you execute locally from the IDE, all classes are in the classpath
anyways.
 - If you use the remote environment, you need to attach the jar file to
environment.

 - In your case (repl), you need to make sure that the generated classes
are given to the TaskManager. In that sense, the approach is probably
similar to the case of executing with a remote environment - only that you
do not have a jar file up front, but need to generate it on the fly. As
Robert mentioned, https://github.com/apache/flink/pull/35 may have a first
solution to that. Other approaches are also possible, like simply always
bundling all classes in the directory where the repl puts its generated
classes.

Greetings,
Stephan


On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]>
wrote:

> I will look into it once I have some time (end of this week, or next
> week probably)
>
> On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]>
> wrote:
> > Hey Nikolaas,
> >
> > Thank you for posting on the mailing list. I've met Nikolaas today in
> > person and we were talking a bit about an interactive shell for Flink,
> > potentially also an integration with Zeppelin.
> >
> > Great stuff I'm really looking forward to :)
> >
> > We were wondering if somebody from the list has some experience with the
> > scala shell.
> > I've pointed Nikolaas also to this PR:
> > https://github.com/apache/flink/pull/35.
> >
> > Best,
> > Robert
> >
> >
> > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <[hidden email]
> >
> > wrote:
> >
> >> Hi!
> >> I am trying to implement a scala shell for flink.
> >>
> >> I've started with a simple scala object who's main function will drop
> the
> >> user to the interactive scala shell (repl) at one point:
> >>
> >>
> >>
> >>
> >> import scala.tools.nsc.interpreter.ILoop
> >> import scala.tools.nsc.Settings
> >>
> >> object Job {
> >>   def main(args: Array[String]) {
> >>
> >>     val repl = new ILoop()
> >>     repl.settings = new Settings()
> >>
> >>     // enable this line to use scala in intellij
> >>     repl.settings.usejavacp.value = true
> >>
> >>     repl.createInterpreter()
> >>
> >>     // start scala interpreter shell
> >>     repl.process(repl.settings)
> >>
> >>     repl.closeInterpreter()
> >>     }
> >>   }
> >>
> >>
> >>
> >>
> >> Now I am trying to execute the word count example as in:
> >>
> >>
> >>
> >>
> >> scala> import org.apache.flink.api.scala._
> >>
> >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> >>
> >> scala> val text = env.fromElements("To be, or not to be,--that is the
> >> question:--","Whether 'tis nobler in the mind to suffer", "The slings
> and
> >> arrows of outrageous fortune","Or to take arms against a sea of
> troubles,")
> >>
> >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map {
> (_,
> >> 1) }.groupBy(0).sum(1)
> >>
> >> scala> counts.print()
> >>
> >> scala> env.execute("Flink Scala Api Skeleton")
> >>
> >>
> >>
> >>
> >>
> >>
> >> However I am running into following error:
> >>
> >> env.execute("Flink Scala Api Skeleton")
> >> org.apache.flink.runtime.client.JobExecutionException:
> >> java.lang.RuntimeException: The initialization of the DataSource's
> outputs
> >> caused an error: The type serializer factory could not load its
> parameters
> >> from the configuration due to missing classes.
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> >> at
> >>
> >>
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> >> at
> >>
> >>
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> >> Caused by: java.lang.RuntimeException: The type serializer factory could
> >> not load its parameters from the configuration due to missing classes.
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> >> ... 8 more
> >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >> at java.lang.Class.forName0(Native Method)
> >> at java.lang.Class.forName(Class.java:274)
> >> at
> >>
> >>
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> >> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> >> at
> >>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> >> at
> >>
> >>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> >> at
> >>
> >>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> >> at
> >>
> >>
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> >> ... 13 more
> >>
> >> at
> >>
> >>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> >> at
> >>
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> >> at
> >>
> >>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> >> at
> >>
> >>
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> >> at .<init>(<console>:12)
> >> at .<clinit>(<console>)
> >> at .<init>(<console>:7)
> >> at .<clinit>(<console>)
> >> at $print(<console>)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> >> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> >> at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> >> at
> >>
> >>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> >> at
> >>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> >> at
> >>
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> >> at
> >>
> >>
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> >> at org.myorg.quickstart.Job$.main(Job.scala:37)
> >> at org.myorg.quickstart.Job.main(Job.scala)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:606)
> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> >>
> >>
> >>
> >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or
> can
> >> point me in some direction?
> >>
> >> thanks,
> >> Nikolaas
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

nse sik
Thanks for the feedback guys!
Apparently The Scala Shell compiles the Shell input to some kind of virtual
directory.
It should be possible to create a jar from it's content and then hand it
over to Flink for execution in some way.
I will further investigate..

cheers,
Nikolaas

2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>:

> To give a bit of context for the exception:
>
> To execute a program, the classes of the user functions need to be
> available the executing TaskManagers.
>
>  - If you execute locally from the IDE, all classes are in the classpath
> anyways.
>  - If you use the remote environment, you need to attach the jar file to
> environment.
>
>  - In your case (repl), you need to make sure that the generated classes
> are given to the TaskManager. In that sense, the approach is probably
> similar to the case of executing with a remote environment - only that you
> do not have a jar file up front, but need to generate it on the fly. As
> Robert mentioned, https://github.com/apache/flink/pull/35 may have a first
> solution to that. Other approaches are also possible, like simply always
> bundling all classes in the directory where the repl puts its generated
> classes.
>
> Greetings,
> Stephan
>
>
> On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > I will look into it once I have some time (end of this week, or next
> > week probably)
> >
> > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]>
> > wrote:
> > > Hey Nikolaas,
> > >
> > > Thank you for posting on the mailing list. I've met Nikolaas today in
> > > person and we were talking a bit about an interactive shell for Flink,
> > > potentially also an integration with Zeppelin.
> > >
> > > Great stuff I'm really looking forward to :)
> > >
> > > We were wondering if somebody from the list has some experience with
> the
> > > scala shell.
> > > I've pointed Nikolaas also to this PR:
> > > https://github.com/apache/flink/pull/35.
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> [hidden email]
> > >
> > > wrote:
> > >
> > >> Hi!
> > >> I am trying to implement a scala shell for flink.
> > >>
> > >> I've started with a simple scala object who's main function will drop
> > the
> > >> user to the interactive scala shell (repl) at one point:
> > >>
> > >>
> > >>
> > >>
> > >> import scala.tools.nsc.interpreter.ILoop
> > >> import scala.tools.nsc.Settings
> > >>
> > >> object Job {
> > >>   def main(args: Array[String]) {
> > >>
> > >>     val repl = new ILoop()
> > >>     repl.settings = new Settings()
> > >>
> > >>     // enable this line to use scala in intellij
> > >>     repl.settings.usejavacp.value = true
> > >>
> > >>     repl.createInterpreter()
> > >>
> > >>     // start scala interpreter shell
> > >>     repl.process(repl.settings)
> > >>
> > >>     repl.closeInterpreter()
> > >>     }
> > >>   }
> > >>
> > >>
> > >>
> > >>
> > >> Now I am trying to execute the word count example as in:
> > >>
> > >>
> > >>
> > >>
> > >> scala> import org.apache.flink.api.scala._
> > >>
> > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > >>
> > >> scala> val text = env.fromElements("To be, or not to be,--that is the
> > >> question:--","Whether 'tis nobler in the mind to suffer", "The slings
> > and
> > >> arrows of outrageous fortune","Or to take arms against a sea of
> > troubles,")
> > >>
> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map {
> > (_,
> > >> 1) }.groupBy(0).sum(1)
> > >>
> > >> scala> counts.print()
> > >>
> > >> scala> env.execute("Flink Scala Api Skeleton")
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> However I am running into following error:
> > >>
> > >> env.execute("Flink Scala Api Skeleton")
> > >> org.apache.flink.runtime.client.JobExecutionException:
> > >> java.lang.RuntimeException: The initialization of the DataSource's
> > outputs
> > >> caused an error: The type serializer factory could not load its
> > parameters
> > >> from the configuration due to missing classes.
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >> at
> > >>
> > >>
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > >> at
> > >>
> > >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> > >> Caused by: java.lang.RuntimeException: The type serializer factory
> could
> > >> not load its parameters from the configuration due to missing classes.
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> > >> ... 8 more
> > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > >> at java.security.AccessController.doPrivileged(Native Method)
> > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > >> at java.lang.Class.forName0(Native Method)
> > >> at java.lang.Class.forName(Class.java:274)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> > >> at
> > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> > >> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> > >> at
> > >>
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> > >> ... 13 more
> > >>
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> > >> at
> > >>
> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> > >> at .<init>(<console>:12)
> > >> at .<clinit>(<console>)
> > >> at .<init>(<console>:7)
> > >> at .<clinit>(<console>)
> > >> at $print(<console>)
> > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >> at
> > >>
> > >>
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > >> at
> > >>
> > >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > >> at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> > >> at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> > >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> > >> at
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> > >> at
> > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> > >> at
> > >>
> > >>
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> > >> at
> > >>
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > >> at
> > >>
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > >> at
> > >>
> > >>
> >
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> > >> at org.myorg.quickstart.Job$.main(Job.scala:37)
> > >> at org.myorg.quickstart.Job.main(Job.scala)
> > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >> at
> > >>
> > >>
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > >> at
> > >>
> > >>
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > >> at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> > >>
> > >>
> > >>
> > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion
> or
> > can
> > >> point me in some direction?
> > >>
> > >> thanks,
> > >> Nikolaas
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

Kostas Tzoumas-2
Great, let us know if you run into any issues.

Can you create a JIRA on the REPL and link to your repository for the
community to track the status?

On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s <[hidden email]>
wrote:

> Thanks for the feedback guys!
> Apparently The Scala Shell compiles the Shell input to some kind of virtual
> directory.
> It should be possible to create a jar from it's content and then hand it
> over to Flink for execution in some way.
> I will further investigate..
>
> cheers,
> Nikolaas
>
> 2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>:
>
> > To give a bit of context for the exception:
> >
> > To execute a program, the classes of the user functions need to be
> > available the executing TaskManagers.
> >
> >  - If you execute locally from the IDE, all classes are in the classpath
> > anyways.
> >  - If you use the remote environment, you need to attach the jar file to
> > environment.
> >
> >  - In your case (repl), you need to make sure that the generated classes
> > are given to the TaskManager. In that sense, the approach is probably
> > similar to the case of executing with a remote environment - only that
> you
> > do not have a jar file up front, but need to generate it on the fly. As
> > Robert mentioned, https://github.com/apache/flink/pull/35 may have a
> first
> > solution to that. Other approaches are also possible, like simply always
> > bundling all classes in the directory where the repl puts its generated
> > classes.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > I will look into it once I have some time (end of this week, or next
> > > week probably)
> > >
> > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]>
> > > wrote:
> > > > Hey Nikolaas,
> > > >
> > > > Thank you for posting on the mailing list. I've met Nikolaas today in
> > > > person and we were talking a bit about an interactive shell for
> Flink,
> > > > potentially also an integration with Zeppelin.
> > > >
> > > > Great stuff I'm really looking forward to :)
> > > >
> > > > We were wondering if somebody from the list has some experience with
> > the
> > > > scala shell.
> > > > I've pointed Nikolaas also to this PR:
> > > > https://github.com/apache/flink/pull/35.
> > > >
> > > > Best,
> > > > Robert
> > > >
> > > >
> > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> > [hidden email]
> > > >
> > > > wrote:
> > > >
> > > >> Hi!
> > > >> I am trying to implement a scala shell for flink.
> > > >>
> > > >> I've started with a simple scala object who's main function will
> drop
> > > the
> > > >> user to the interactive scala shell (repl) at one point:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> import scala.tools.nsc.interpreter.ILoop
> > > >> import scala.tools.nsc.Settings
> > > >>
> > > >> object Job {
> > > >>   def main(args: Array[String]) {
> > > >>
> > > >>     val repl = new ILoop()
> > > >>     repl.settings = new Settings()
> > > >>
> > > >>     // enable this line to use scala in intellij
> > > >>     repl.settings.usejavacp.value = true
> > > >>
> > > >>     repl.createInterpreter()
> > > >>
> > > >>     // start scala interpreter shell
> > > >>     repl.process(repl.settings)
> > > >>
> > > >>     repl.closeInterpreter()
> > > >>     }
> > > >>   }
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Now I am trying to execute the word count example as in:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> scala> import org.apache.flink.api.scala._
> > > >>
> > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > > >>
> > > >> scala> val text = env.fromElements("To be, or not to be,--that is
> the
> > > >> question:--","Whether 'tis nobler in the mind to suffer", "The
> slings
> > > and
> > > >> arrows of outrageous fortune","Or to take arms against a sea of
> > > troubles,")
> > > >>
> > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")
> }.map {
> > > (_,
> > > >> 1) }.groupBy(0).sum(1)
> > > >>
> > > >> scala> counts.print()
> > > >>
> > > >> scala> env.execute("Flink Scala Api Skeleton")
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> However I am running into following error:
> > > >>
> > > >> env.execute("Flink Scala Api Skeleton")
> > > >> org.apache.flink.runtime.client.JobExecutionException:
> > > >> java.lang.RuntimeException: The initialization of the DataSource's
> > > outputs
> > > >> caused an error: The type serializer factory could not load its
> > > parameters
> > > >> from the configuration due to missing classes.
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> > > >> Caused by: java.lang.RuntimeException: The type serializer factory
> > could
> > > >> not load its parameters from the configuration due to missing
> classes.
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> > > >> ... 8 more
> > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > >> at java.lang.Class.forName0(Native Method)
> > > >> at java.lang.Class.forName(Class.java:274)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> > > >> at
> > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> > > >> at
> > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> > > >> at
> > > >>
> > >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> > > >> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> > > >> ... 13 more
> > > >>
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> > > >> at
> > > >>
> > >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> > > >> at .<init>(<console>:12)
> > > >> at .<clinit>(<console>)
> > > >> at .<init>(<console>:7)
> > > >> at .<clinit>(<console>)
> > > >> at $print(<console>)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at
> > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> > > >> at
> > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> > > >> at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> > > >> at
> > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> > > >> at
> > >
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> > > >> at
> > > >>
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> > > >> at
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > >> at
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > >> at
> > > >>
> > > >>
> > >
> >
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> > > >> at org.myorg.quickstart.Job$.main(Job.scala:37)
> > > >> at org.myorg.quickstart.Job.main(Job.scala)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at
> > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> > > >>
> > > >>
> > > >>
> > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion
> > or
> > > can
> > > >> point me in some direction?
> > > >>
> > > >> thanks,
> > > >> Nikolaas
> > > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink interactive Scala shell

Robert Metzger
I would also keep an eye on this issue from the Zeppelin project:
https://issues.apache.org/jira/browse/ZEPPELIN-44
The needed infrastructure is going to be very similar

On Thu, Apr 16, 2015 at 10:15 AM, Kostas Tzoumas <[hidden email]>
wrote:

> Great, let us know if you run into any issues.
>
> Can you create a JIRA on the REPL and link to your repository for the
> community to track the status?
>
> On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s <
> [hidden email]>
> wrote:
>
> > Thanks for the feedback guys!
> > Apparently The Scala Shell compiles the Shell input to some kind of
> virtual
> > directory.
> > It should be possible to create a jar from it's content and then hand it
> > over to Flink for execution in some way.
> > I will further investigate..
> >
> > cheers,
> > Nikolaas
> >
> > 2015-04-15 11:20 GMT+02:00 Stephan Ewen <[hidden email]>:
> >
> > > To give a bit of context for the exception:
> > >
> > > To execute a program, the classes of the user functions need to be
> > > available the executing TaskManagers.
> > >
> > >  - If you execute locally from the IDE, all classes are in the
> classpath
> > > anyways.
> > >  - If you use the remote environment, you need to attach the jar file
> to
> > > environment.
> > >
> > >  - In your case (repl), you need to make sure that the generated
> classes
> > > are given to the TaskManager. In that sense, the approach is probably
> > > similar to the case of executing with a remote environment - only that
> > you
> > > do not have a jar file up front, but need to generate it on the fly. As
> > > Robert mentioned, https://github.com/apache/flink/pull/35 may have a
> > first
> > > solution to that. Other approaches are also possible, like simply
> always
> > > bundling all classes in the directory where the repl puts its generated
> > > classes.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <
> [hidden email]>
> > > wrote:
> > >
> > > > I will look into it once I have some time (end of this week, or next
> > > > week probably)
> > > >
> > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <[hidden email]
> >
> > > > wrote:
> > > > > Hey Nikolaas,
> > > > >
> > > > > Thank you for posting on the mailing list. I've met Nikolaas today
> in
> > > > > person and we were talking a bit about an interactive shell for
> > Flink,
> > > > > potentially also an integration with Zeppelin.
> > > > >
> > > > > Great stuff I'm really looking forward to :)
> > > > >
> > > > > We were wondering if somebody from the list has some experience
> with
> > > the
> > > > > scala shell.
> > > > > I've pointed Nikolaas also to this PR:
> > > > > https://github.com/apache/flink/pull/35.
> > > > >
> > > > > Best,
> > > > > Robert
> > > > >
> > > > >
> > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> > > [hidden email]
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Hi!
> > > > >> I am trying to implement a scala shell for flink.
> > > > >>
> > > > >> I've started with a simple scala object who's main function will
> > drop
> > > > the
> > > > >> user to the interactive scala shell (repl) at one point:
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> import scala.tools.nsc.interpreter.ILoop
> > > > >> import scala.tools.nsc.Settings
> > > > >>
> > > > >> object Job {
> > > > >>   def main(args: Array[String]) {
> > > > >>
> > > > >>     val repl = new ILoop()
> > > > >>     repl.settings = new Settings()
> > > > >>
> > > > >>     // enable this line to use scala in intellij
> > > > >>     repl.settings.usejavacp.value = true
> > > > >>
> > > > >>     repl.createInterpreter()
> > > > >>
> > > > >>     // start scala interpreter shell
> > > > >>     repl.process(repl.settings)
> > > > >>
> > > > >>     repl.closeInterpreter()
> > > > >>     }
> > > > >>   }
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Now I am trying to execute the word count example as in:
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> scala> import org.apache.flink.api.scala._
> > > > >>
> > > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > > > >>
> > > > >> scala> val text = env.fromElements("To be, or not to be,--that is
> > the
> > > > >> question:--","Whether 'tis nobler in the mind to suffer", "The
> > slings
> > > > and
> > > > >> arrows of outrageous fortune","Or to take arms against a sea of
> > > > troubles,")
> > > > >>
> > > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")
> > }.map {
> > > > (_,
> > > > >> 1) }.groupBy(0).sum(1)
> > > > >>
> > > > >> scala> counts.print()
> > > > >>
> > > > >> scala> env.execute("Flink Scala Api Skeleton")
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> However I am running into following error:
> > > > >>
> > > > >> env.execute("Flink Scala Api Skeleton")
> > > > >> org.apache.flink.runtime.client.JobExecutionException:
> > > > >> java.lang.RuntimeException: The initialization of the DataSource's
> > > > outputs
> > > > >> caused an error: The type serializer factory could not load its
> > > > parameters
> > > > >> from the configuration due to missing classes.
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> > > > >> at
> org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> > > > >> Caused by: java.lang.RuntimeException: The type serializer factory
> > > could
> > > > >> not load its parameters from the configuration due to missing
> > classes.
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> > > > >> ... 8 more
> > > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > >> at java.lang.Class.forName0(Native Method)
> > > > >> at java.lang.Class.forName(Class.java:274)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> > > > >> at
> > > >
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> > > > >> at
> > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> > > > >> at
> > > > >>
> > > >
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> > > > >> at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > >> at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> > > > >> ... 13 more
> > > > >>
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> > > > >> at
> > > > >>
> > > >
> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> > > > >> at .<init>(<console>:12)
> > > > >> at .<clinit>(<console>)
> > > > >> at .<init>(<console>:7)
> > > > >> at .<clinit>(<console>)
> > > > >> at $print(<console>)
> > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > > >> at
> > > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> > > > >> at
> > > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> > > > >> at
> > scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> > > > >> at
> > > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> > > > >> at
> > > >
> > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> > > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> > > > >> at
> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> > > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> > > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> > > > >> at
> > > > >>
> > > >
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > > >> at
> > > > >>
> > > >
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> > > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> > > > >> at org.myorg.quickstart.Job$.main(Job.scala:37)
> > > > >> at org.myorg.quickstart.Job.main(Job.scala)
> > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > > >> at
> > > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> > > > >>
> > > > >>
> > > > >>
> > > > >> I'm pretty new to Scala and Flink, so maybe someone has a
> suggestion
> > > or
> > > > can
> > > > >> point me in some direction?
> > > > >>
> > > > >> thanks,
> > > > >> Nikolaas
> > > > >>
> > > >
> > >
> >
>