hello,
i hope this is the right place for this question. i'm currently experimenting and comparing flink/stratosphere and apache spark. my goal is to analyse large json-files of twitter-data and now i'm looking for a way to parse the json-tuples in a map-function and put in a dataset. for this i'm using the flink scala api and json4s. but in flink the problem is to parse the json-file. val words = cleaned.map { ( line => parse(line) } Error Message is: Error analyzing UDT org.json4s.JValue: Subtype org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type BigInt Subtype org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue] - Subtype org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type BigInt Subtype org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue] - Subtype org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported type BigDecimal Subtype org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported type BigDecimal Subtype org.json4s.JsonAST.JObject - Field obj: List[(String, org.json4s.JsonAST.JValue)] - Field _2: org.json4s.JsonAST.JValue - Subtype org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type BigInt Subtype org.json4s.JsonAST.JObject - Field obj: List[(String, org.json4s.JsonAST.JValue)] - Field _2: org.json4s.JsonAST.JValue - Subtype org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported type BigDecimal in spark i found a way based on https://gist.github.com/cotdp/b471cfff183b59d65ae1 val user_interest = lines.map(line => {parse(line)}) .map(json => {implicit lazy val formats = org.json4s.DefaultFormats val name = (json \ "name").extract[String] val location_x = (json \ "location" \ "x").extract[Double] val location_y = (json \ "location" \ "y").extract[Double] val likes = (json \ "likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";") ( UserInterest(name, location_x, location_y, likes) ) }) this works fine in spark, but is it possible to do the same with flink? kind regards, norman |
Hi Norman,
right now it is only possible to use Primitive Types and Case Classes (of which tuples are a special case) as Scala Data Types. Your program could work if you omit the second map function and instead put that code in your first map function. This way you avoid having that custom JSON Type as the data type of your first intermediate DataSet. Just let me now if you need more information. Aljoscha On Wed, Aug 6, 2014 at 5:15 PM, Norman Spangenberg < [hidden email]> wrote: > hello, > i hope this is the right place for this question. > i'm currently experimenting and comparing flink/stratosphere and apache > spark. > my goal is to analyse large json-files of twitter-data and now i'm looking > for a way to parse the json-tuples in a map-function and put in a dataset. > for this i'm using the flink scala api and json4s. > but in flink the problem is to parse the json-file. > val words = cleaned.map { ( line => parse(line) } > Error Message is: > Error analyzing UDT org.json4s.JValue: Subtype org.json4s.JsonAST.JInt > - Field num: BigInt - Unsupported type BigInt Subtype > org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue] > - Subtype org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type > BigInt Subtype > org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue] > - Subtype org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported > type > BigDecimal Subtype org.json4s.JsonAST.JDecimal - Field num: > BigDecimal - Unsupported type BigDecimal Subtype org.json4s.JsonAST.JObject > - Field obj: > List[(String, org.json4s.JsonAST.JValue)] - Field _2: > org.json4s.JsonAST.JValue - Subtype org.json4s.JsonAST.JInt - Field num: > BigInt - Unsupported type BigInt > Subtype org.json4s.JsonAST.JObject - Field obj: List[(String, > org.json4s.JsonAST.JValue)] - Field _2: org.json4s.JsonAST.JValue - Subtype > org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported > type BigDecimal > > > in spark i found a way based on https://gist.github.com/cotdp/ > b471cfff183b59d65ae1 > > val user_interest = lines.map(line => {parse(line)}) > .map(json => {implicit lazy val formats = > org.json4s.DefaultFormats > val name = (json \ > "name").extract[String] > val location_x = (json \ "location" \ > "x").extract[Double] > val location_y = (json \ "location" \ > "y").extract[Double] > val likes = (json \ > "likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";") > ( UserInterest(name, location_x, > location_y, likes) ) > }) > > this works fine in spark, but is it possible to do the same with flink? > > kind regards, > norman > |
In reply to this post by normanSp
Hello Aljoscha,
Thanks for your reply. It was really helpful. After some time to figure out the right syntax it worked perfectly. val user_interest = lines.map( line => { val parsed = parse(line) implicit lazy val formats = org.json4s.DefaultFormats val name = parsed.\("name").extract[String] val location_x = parsed.\("location").\("x").extract[Double] val location_y = parsed.\("location").\("y").extract[Double] val likes = parsed.\("likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";") ( UserInterest(name, location_x, location_y, likes) ) }) Is this the best way to handle with json data? or is there a more efficient way? thank you, norman ---------------------------------------------------------------- This message was sent using IMP, the Internet Messaging Program. |
Hi,
I think it is a good way, yes. You could also handle the JSON parsing in a custom input format but this would only shift the computation to a different place. Performance should not be impacted by this. (I think parsing JSON is slow no matter what you do and not matter what cluster processing framework you use. :D) Aljoscha On Fri, Aug 8, 2014 at 12:49 PM, Norman Spangenberg < [hidden email]> wrote: > Hello Aljoscha, > Thanks for your reply. It was really helpful. > After some time to figure out the right syntax it worked perfectly. > > val user_interest = lines.map( line => { > val parsed = parse(line) > > implicit lazy val formats = > org.json4s.DefaultFormats > val name = > parsed.\("name").extract[String] > val location_x = > parsed.\("location").\("x").extract[Double] > val location_y = > parsed.\("location").\("y").extract[Double] > val likes = > parsed.\("likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";") > > ( UserInterest(name, location_x, > location_y, likes) ) > }) > > Is this the best way to handle with json data? or is there a more > efficient way? > > thank you, > norman > > > ---------------------------------------------------------------- > This message was sent using IMP, the Internet Messaging Program. > > > |
In reply to this post by normanSp
Thank you Aljoscha,
Sorry, but now the next problem occurs. The code i've posted works fine locally (in eclipse). but in the cluster environement there's a problem: NoClassDefFoundError: org/json4s/Formats I'm not really sure wether this problem is because of stratosphere/yarn or json4s. A little bit weird is that the same functionality(and code) with same dependencies in spark works fine in the cluster. without any error. Getting the following error message: eu.stratosphere.client.program.ProgramInvocationException: The program execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) at eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) at eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) at TwitterWeather$$anon$15$$anonfun$map$5.apply(TwitterWeather.scala:141) at TwitterWeather$$anon$15$$anonfun$map$5.apply(TwitterWeather.scala:141) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) at eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:171) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: org.json4s.Formats at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 16 more at eu.stratosphere.client.program.Client.run(Client.java:316) at eu.stratosphere.client.program.Client.run(Client.java:282) at eu.stratosphere.client.program.Client.run(Client.java:276) at eu.stratosphere.client.program.Client.run(Client.java:220) at eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327) at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927) at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) kind regards Norman |
Hi,
it seems you have to put the json4s jar into the lib folder of your Flink (Stratosphere) installation on every Slave Node. Are you using yarn or our own cluster management? Aljoscha On Sun, Aug 10, 2014 at 10:36 PM, Norman Spangenberg < [hidden email]> wrote: > Thank you Aljoscha, > Sorry, but now the next problem occurs. > The code i've posted works fine locally (in eclipse). but in the cluster > environement there's a problem: NoClassDefFoundError: org/json4s/Formats > I'm not really sure wether this problem is because of stratosphere/yarn > or json4s. > A little bit weird is that the same functionality(and code) with same > dependencies in spark works fine in the cluster. without any error. > Getting the following error message: > > eu.stratosphere.client.program.ProgramInvocationException: The program > execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats > at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) > at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) > at eu.stratosphere.pact.runtime.task.chaining. > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) > at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) > at eu.stratosphere.pact.runtime.task.chaining. > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > at TwitterWeather$$anon$15$$anonfun$map$5.apply( > TwitterWeather.scala:141) > at TwitterWeather$$anon$15$$anonfun$map$5.apply( > TwitterWeather.scala:141) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) > at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) > at eu.stratosphere.pact.runtime.task.chaining. > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke( > DataSourceTask.java:171) > at eu.stratosphere.nephele.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:260) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.lang.ClassNotFoundException: org.json4s.Formats > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > ... 16 more > > at eu.stratosphere.client.program.Client.run(Client.java:316) > at eu.stratosphere.client.program.Client.run(Client.java:282) > at eu.stratosphere.client.program.Client.run(Client.java:276) > at eu.stratosphere.client.program.Client.run(Client.java:220) > at eu.stratosphere.client.CliFrontend.executeProgram( > CliFrontend.java:327) > at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) > at eu.stratosphere.client.CliFrontend.parseParameters( > CliFrontend.java:927) > at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) > > kind regards > Norman > > |
If you are building the jar file using maven, you can also use the maven
assembly plugin to build a fat jar (jar-with-dependencies). Then, the dependencies will be packed into the job's jar file. On Mon, Aug 11, 2014 at 7:38 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > it seems you have to put the json4s jar into the lib folder of your Flink > (Stratosphere) installation on every Slave Node. Are you using yarn or our > own cluster management? > > Aljoscha > > > On Sun, Aug 10, 2014 at 10:36 PM, Norman Spangenberg < > [hidden email]> wrote: > > > Thank you Aljoscha, > > Sorry, but now the next problem occurs. > > The code i've posted works fine locally (in eclipse). but in the cluster > > environement there's a problem: NoClassDefFoundError: org/json4s/Formats > > I'm not really sure wether this problem is because of stratosphere/yarn > > or json4s. > > A little bit weird is that the same functionality(and code) with same > > dependencies in spark works fine in the cluster. without any error. > > Getting the following error message: > > > > eu.stratosphere.client.program.ProgramInvocationException: The program > > execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats > > at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) > > at TwitterWeather$$anon$17.map(TwitterWeather.scala:143) > > at eu.stratosphere.pact.runtime.task.chaining. > > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > > at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) > > at TwitterWeather$$anon$16.map(TwitterWeather.scala:141) > > at eu.stratosphere.pact.runtime.task.chaining. > > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > > at TwitterWeather$$anon$15$$anonfun$map$5.apply( > > TwitterWeather.scala:141) > > at TwitterWeather$$anon$15$$anonfun$map$5.apply( > > TwitterWeather.scala:141) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) > > at TwitterWeather$$anon$15.map(TwitterWeather.scala:141) > > at eu.stratosphere.pact.runtime.task.chaining. > > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71) > > at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke( > > DataSourceTask.java:171) > > at eu.stratosphere.nephele.execution.RuntimeEnvironment. > > run(RuntimeEnvironment.java:260) > > at java.lang.Thread.run(Thread.java:744) > > Caused by: java.lang.ClassNotFoundException: org.json4s.Formats > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > at java.security.AccessController.doPrivileged(Native Method) > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > ... 16 more > > > > at eu.stratosphere.client.program.Client.run(Client.java:316) > > at eu.stratosphere.client.program.Client.run(Client.java:282) > > at eu.stratosphere.client.program.Client.run(Client.java:276) > > at eu.stratosphere.client.program.Client.run(Client.java:220) > > at eu.stratosphere.client.CliFrontend.executeProgram( > > CliFrontend.java:327) > > at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314) > > at eu.stratosphere.client.CliFrontend.parseParameters( > > CliFrontend.java:927) > > at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951) > > > > kind regards > > Norman > > > > > |
In reply to this post by normanSp
Hello Aljoscha and Robert,
Sorry for that stupid question. Building a fat JAR with maven worked for me. Thank you. actually I tried to copy the json4s-JARsto the lib folders of the cluster. But that didn't work. In Yarn-Cluster-Mode: where is the right directory to put that JARs? Is it hadoop-VERSION/share/hadoop/common/lib/ ? kind regards norman |
Hi,
Did you use our quickstart (=maven archetype) scripts to setup your maven project? We should integrate the "maven-assembly-plugin" configuration with the fat-jar preconfigured into the archetype so that users automatically get their dependencies included. ( https://issues.apache.org/jira/browse/FLINK-1056) The YARN package contains a directory "ship". Everything placed in their will be shipped to all YARN containers. The files are placed in the working directory of each process and also loaded into the JVM's classpath. So if you put your jars there, they should be loadable by the classloader. Robert On Tue, Aug 12, 2014 at 10:32 AM, Norman Spangenberg < [hidden email]> wrote: > Hello Aljoscha and Robert, > Sorry for that stupid question. Building a fat JAR with maven worked for > me. Thank you. > actually I tried to copy the json4s-JARsto the lib folders of the cluster. > But that didn't work. > In Yarn-Cluster-Mode: where is the right directory to put that JARs? Is it > hadoop-VERSION/share/hadoop/common/lib/ ? > > kind regards > norman > > |
Free forum by Nabble | Edit this page |