parse json-file with scala-api and json4s

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

parse json-file with scala-api and json4s

normanSp
hello,
i hope this is the right place for this question.
i'm currently experimenting and comparing flink/stratosphere and apache
spark.
my goal is to analyse large json-files of twitter-data and now i'm
looking for a way to parse the json-tuples in a map-function and put in
a dataset.
for this i'm using the flink scala api and json4s.
but in flink the problem is to parse the json-file.
val words = cleaned.map { ( line =>  parse(line) }
Error Message is:
     Error analyzing UDT org.json4s.JValue: Subtype
org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type BigInt
Subtype
      org.json4s.JsonAST.JArray - Field arr:
List[org.json4s.JsonAST.JValue] - Subtype org.json4s.JsonAST.JInt -
Field num: BigInt - Unsupported type BigInt Subtype
      org.json4s.JsonAST.JArray - Field arr:
List[org.json4s.JsonAST.JValue] - Subtype org.json4s.JsonAST.JDecimal -
Field num: BigDecimal - Unsupported type
      BigDecimal Subtype org.json4s.JsonAST.JDecimal - Field num:
BigDecimal - Unsupported type BigDecimal Subtype
org.json4s.JsonAST.JObject - Field obj:
      List[(String, org.json4s.JsonAST.JValue)] - Field _2:
org.json4s.JsonAST.JValue - Subtype org.json4s.JsonAST.JInt - Field num:
BigInt - Unsupported type BigInt
      Subtype org.json4s.JsonAST.JObject - Field obj: List[(String,
org.json4s.JsonAST.JValue)] - Field _2: org.json4s.JsonAST.JValue - Subtype
      org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported
type BigDecimal


in spark i found a way based on
https://gist.github.com/cotdp/b471cfff183b59d65ae1

val user_interest = lines.map(line => {parse(line)})
                          .map(json => {implicit lazy val formats =
org.json4s.DefaultFormats
                                     val name = (json \
"name").extract[String]
                                     val location_x = (json \ "location"
\ "x").extract[Double]
                                     val location_y = (json \ "location"
\ "y").extract[Double]
                                     val likes = (json \
"likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";")
                                     ( UserInterest(name, location_x,
location_y, likes) )
                                    })

this works fine in spark, but is it possible to do the same with flink?

kind regards,
norman
Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Aljoscha Krettek-2
Hi Norman,
right now it is only possible to use Primitive Types and Case Classes (of
which tuples are a special case) as Scala Data Types. Your program could
work if you omit the second map function and instead put that code in your
first map function. This way you avoid having that custom JSON Type as the
data type of your first intermediate DataSet.

Just let me now if you need more information.

Aljoscha


On Wed, Aug 6, 2014 at 5:15 PM, Norman Spangenberg <
[hidden email]> wrote:

> hello,
> i hope this is the right place for this question.
> i'm currently experimenting and comparing flink/stratosphere and apache
> spark.
> my goal is to analyse large json-files of twitter-data and now i'm looking
> for a way to parse the json-tuples in a map-function and put in a dataset.
> for this i'm using the flink scala api and json4s.
> but in flink the problem is to parse the json-file.
> val words = cleaned.map { ( line =>  parse(line) }
> Error Message is:
>     Error analyzing UDT org.json4s.JValue: Subtype org.json4s.JsonAST.JInt
> - Field num: BigInt - Unsupported type BigInt Subtype
>      org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue]
> - Subtype org.json4s.JsonAST.JInt - Field num: BigInt - Unsupported type
> BigInt Subtype
>      org.json4s.JsonAST.JArray - Field arr: List[org.json4s.JsonAST.JValue]
> - Subtype org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported
> type
>      BigDecimal Subtype org.json4s.JsonAST.JDecimal - Field num:
> BigDecimal - Unsupported type BigDecimal Subtype org.json4s.JsonAST.JObject
> - Field obj:
>      List[(String, org.json4s.JsonAST.JValue)] - Field _2:
> org.json4s.JsonAST.JValue - Subtype org.json4s.JsonAST.JInt - Field num:
> BigInt - Unsupported type BigInt
>      Subtype org.json4s.JsonAST.JObject - Field obj: List[(String,
> org.json4s.JsonAST.JValue)] - Field _2: org.json4s.JsonAST.JValue - Subtype
>      org.json4s.JsonAST.JDecimal - Field num: BigDecimal - Unsupported
> type BigDecimal
>
>
> in spark i found a way based on https://gist.github.com/cotdp/
> b471cfff183b59d65ae1
>
> val user_interest = lines.map(line => {parse(line)})
>                          .map(json => {implicit lazy val formats =
> org.json4s.DefaultFormats
>                                     val name = (json \
> "name").extract[String]
>                                     val location_x = (json \ "location" \
> "x").extract[Double]
>                                     val location_y = (json \ "location" \
> "y").extract[Double]
>                                     val likes = (json \
> "likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";")
>                                     ( UserInterest(name, location_x,
> location_y, likes) )
>                                    })
>
> this works fine in spark, but is it possible to do the same with flink?
>
> kind regards,
> norman
>
Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Norman Spangenberg-2
In reply to this post by normanSp
Hello Aljoscha,
Thanks for your reply. It was really helpful.
After some time to figure out the right syntax it worked perfectly.

val user_interest = lines.map( line => {
                                        val parsed = parse(line)
                          implicit lazy val formats =  
org.json4s.DefaultFormats
                                        val name = parsed.\("name").extract[String]
                                        val location_x = parsed.\("location").\("x").extract[Double]
                                        val location_y = parsed.\("location").\("y").extract[Double]
                                        val likes =  
parsed.\("likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";")
                                        ( UserInterest(name, location_x, location_y, likes) )
                                     })

Is this the best way to handle with json data? or is there a more  
efficient way?

thank you,
norman


----------------------------------------------------------------
This message was sent using IMP, the Internet Messaging Program.


Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Aljoscha Krettek-2
Hi,
I think it is a good way, yes. You could also handle the JSON parsing in a
custom input format but this would only shift the computation to a
different place. Performance should not be impacted by this. (I think
parsing JSON is slow no matter what you do and not matter what cluster
processing framework you use. :D)

Aljoscha


On Fri, Aug 8, 2014 at 12:49 PM, Norman Spangenberg <
[hidden email]> wrote:

> Hello Aljoscha,
> Thanks for your reply. It was really helpful.
> After some time to figure out the right syntax it worked perfectly.
>
> val user_interest = lines.map( line => {
>                                         val parsed = parse(line)
>
>                                         implicit lazy val formats =
> org.json4s.DefaultFormats
>                                         val name =
> parsed.\("name").extract[String]
>                                         val location_x =
> parsed.\("location").\("x").extract[Double]
>                                         val location_y =
> parsed.\("location").\("y").extract[Double]
>                                         val likes =
> parsed.\("likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";")
>
>                                         ( UserInterest(name, location_x,
> location_y, likes) )
>                                     })
>
> Is this the best way to handle with json data? or is there a more
> efficient way?
>
> thank you,
> norman
>
>
> ----------------------------------------------------------------
> This message was sent using IMP, the Internet Messaging Program.
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

normanSp
In reply to this post by normanSp
Thank you Aljoscha,
Sorry, but now the next problem occurs.
The code i've posted works fine locally (in eclipse). but in the cluster
environement there's a problem: NoClassDefFoundError: org/json4s/Formats
I'm  not really sure wether this problem is because of stratosphere/yarn
or json4s.
A little bit weird is that the same functionality(and code) with same
dependencies in spark works fine in the cluster. without any error.
Getting the following error message:

eu.stratosphere.client.program.ProgramInvocationException: The program
execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats
         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
         at
eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
         at
eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
         at
TwitterWeather$$anon$15$$anonfun$map$5.apply(TwitterWeather.scala:141)
         at
TwitterWeather$$anon$15$$anonfun$map$5.apply(TwitterWeather.scala:141)
         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
         at
eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
         at
eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(DataSourceTask.java:171)
         at
eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
         at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassNotFoundException: org.json4s.Formats
         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
         at java.security.AccessController.doPrivileged(Native Method)
         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         ... 16 more

         at eu.stratosphere.client.program.Client.run(Client.java:316)
         at eu.stratosphere.client.program.Client.run(Client.java:282)
         at eu.stratosphere.client.program.Client.run(Client.java:276)
         at eu.stratosphere.client.program.Client.run(Client.java:220)
         at
eu.stratosphere.client.CliFrontend.executeProgram(CliFrontend.java:327)
         at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
         at
eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:927)
         at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)

kind regards
Norman

Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Aljoscha Krettek-2
Hi,
it seems you have to put the json4s jar into the lib folder of your Flink
(Stratosphere) installation on every Slave Node. Are you using yarn or our
own cluster management?

Aljoscha


On Sun, Aug 10, 2014 at 10:36 PM, Norman Spangenberg <
[hidden email]> wrote:

> Thank you Aljoscha,
> Sorry, but now the next problem occurs.
> The code i've posted works fine locally (in eclipse). but in the cluster
> environement there's a problem: NoClassDefFoundError: org/json4s/Formats
> I'm  not really sure wether this problem is because of stratosphere/yarn
> or json4s.
> A little bit weird is that the same functionality(and code) with same
> dependencies in spark works fine in the cluster. without any error.
> Getting the following error message:
>
> eu.stratosphere.client.program.ProgramInvocationException: The program
> execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats
>         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
>         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
>         at eu.stratosphere.pact.runtime.task.chaining.
> ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
>         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
>         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
>         at eu.stratosphere.pact.runtime.task.chaining.
> ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
>         at TwitterWeather$$anon$15$$anonfun$map$5.apply(
> TwitterWeather.scala:141)
>         at TwitterWeather$$anon$15$$anonfun$map$5.apply(
> TwitterWeather.scala:141)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
>         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
>         at eu.stratosphere.pact.runtime.task.chaining.
> ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
>         at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(
> DataSourceTask.java:171)
>         at eu.stratosphere.nephele.execution.RuntimeEnvironment.
> run(RuntimeEnvironment.java:260)
>         at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.ClassNotFoundException: org.json4s.Formats
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         ... 16 more
>
>         at eu.stratosphere.client.program.Client.run(Client.java:316)
>         at eu.stratosphere.client.program.Client.run(Client.java:282)
>         at eu.stratosphere.client.program.Client.run(Client.java:276)
>         at eu.stratosphere.client.program.Client.run(Client.java:220)
>         at eu.stratosphere.client.CliFrontend.executeProgram(
> CliFrontend.java:327)
>         at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
>         at eu.stratosphere.client.CliFrontend.parseParameters(
> CliFrontend.java:927)
>         at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
>
> kind regards
> Norman
>
>
Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Robert Metzger
If you are building the jar file using maven, you can also use the maven
assembly plugin to build a fat jar (jar-with-dependencies).
Then, the dependencies will be packed into the job's jar file.


On Mon, Aug 11, 2014 at 7:38 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> it seems you have to put the json4s jar into the lib folder of your Flink
> (Stratosphere) installation on every Slave Node. Are you using yarn or our
> own cluster management?
>
> Aljoscha
>
>
> On Sun, Aug 10, 2014 at 10:36 PM, Norman Spangenberg <
> [hidden email]> wrote:
>
> > Thank you Aljoscha,
> > Sorry, but now the next problem occurs.
> > The code i've posted works fine locally (in eclipse). but in the cluster
> > environement there's a problem: NoClassDefFoundError: org/json4s/Formats
> > I'm  not really sure wether this problem is because of stratosphere/yarn
> > or json4s.
> > A little bit weird is that the same functionality(and code) with same
> > dependencies in spark works fine in the cluster. without any error.
> > Getting the following error message:
> >
> > eu.stratosphere.client.program.ProgramInvocationException: The program
> > execution failed: java.lang.NoClassDefFoundError: org/json4s/Formats
> >         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
> >         at TwitterWeather$$anon$17.map(TwitterWeather.scala:143)
> >         at eu.stratosphere.pact.runtime.task.chaining.
> > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
> >         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
> >         at TwitterWeather$$anon$16.map(TwitterWeather.scala:141)
> >         at eu.stratosphere.pact.runtime.task.chaining.
> > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
> >         at TwitterWeather$$anon$15$$anonfun$map$5.apply(
> > TwitterWeather.scala:141)
> >         at TwitterWeather$$anon$15$$anonfun$map$5.apply(
> > TwitterWeather.scala:141)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
> >         at TwitterWeather$$anon$15.map(TwitterWeather.scala:141)
> >         at eu.stratosphere.pact.runtime.task.chaining.
> > ChainedCollectorMapDriver.collect(ChainedCollectorMapDriver.java:71)
> >         at eu.stratosphere.pact.runtime.task.DataSourceTask.invoke(
> > DataSourceTask.java:171)
> >         at eu.stratosphere.nephele.execution.RuntimeEnvironment.
> > run(RuntimeEnvironment.java:260)
> >         at java.lang.Thread.run(Thread.java:744)
> > Caused by: java.lang.ClassNotFoundException: org.json4s.Formats
> >         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >         ... 16 more
> >
> >         at eu.stratosphere.client.program.Client.run(Client.java:316)
> >         at eu.stratosphere.client.program.Client.run(Client.java:282)
> >         at eu.stratosphere.client.program.Client.run(Client.java:276)
> >         at eu.stratosphere.client.program.Client.run(Client.java:220)
> >         at eu.stratosphere.client.CliFrontend.executeProgram(
> > CliFrontend.java:327)
> >         at eu.stratosphere.client.CliFrontend.run(CliFrontend.java:314)
> >         at eu.stratosphere.client.CliFrontend.parseParameters(
> > CliFrontend.java:927)
> >         at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
> >
> > kind regards
> > Norman
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

normanSp
In reply to this post by normanSp
Hello Aljoscha and Robert,
Sorry for that stupid question. Building a fat JAR with maven worked for
me. Thank you.
actually I tried to copy the json4s-JARsto the lib folders of the
cluster. But that didn't work.
In Yarn-Cluster-Mode: where is the right directory to put that JARs? Is
it hadoop-VERSION/share/hadoop/common/lib/ ?

kind regards
norman

Reply | Threaded
Open this post in threaded view
|

Re: parse json-file with scala-api and json4s

Robert Metzger
Hi,

Did you use our quickstart (=maven archetype) scripts to setup your maven
project?
We should integrate the "maven-assembly-plugin" configuration with the
fat-jar preconfigured into the archetype so that users automatically get
their dependencies included. (
https://issues.apache.org/jira/browse/FLINK-1056)

The YARN package contains a directory "ship". Everything placed in their
will be shipped to all YARN containers. The files are placed in the working
directory of each process and also loaded into the JVM's classpath. So if
you put your jars there, they should be loadable by the classloader.


Robert




On Tue, Aug 12, 2014 at 10:32 AM, Norman Spangenberg <
[hidden email]> wrote:

> Hello Aljoscha and Robert,
> Sorry for that stupid question. Building a fat JAR with maven worked for
> me. Thank you.
> actually I tried to copy the json4s-JARsto the lib folders of the cluster.
> But that didn't work.
> In Yarn-Cluster-Mode: where is the right directory to put that JARs? Is it
> hadoop-VERSION/share/hadoop/common/lib/ ?
>
> kind regards
> norman
>
>