neo4j - Flink connector

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

neo4j - Flink connector

Vasiliki Kalavri
Hello everyone,

Martin, Martin, Alex (cc'ed) and myself have started discussing about
implementing a neo4j-Flink connector. I've opened a corresponding JIRA
(FLINK-2941) containing an initial document [1], but we'd also like to
share our ideas here to engage the community and get your feedback.

We've had a skype call today and I will try to summarize some of the key
points here. The main use-cases we see are the following:

- Use Flink for ETL / creating the graph and then insert it to a graph
database, like neo4j, for querying and search.
- Query neo4j on some topic or search the graph for patterns and extract a
subgraph, on which we'd then like to run some iterative graph analysis
task. This is where Flink/Gelly can help, by complementing the querying
(neo4j) with efficient iterative computation.

We all agreed that the main challenge is efficiently getting the data out
of neo4j and into Flink. There have been some attempts to do similar things
with neo4j and Spark, but the performance results are not very promising:

- Mazerunner [2] is using HDFS for communication. We think that's it's not
worth it going towards this direction, as dumping the neo4j database to
HDFS and then reading it back to Flink would probably be terribly slow.
- In [3], you can see Michael Hunger's findings on using neo's HTTP
interface to import data into Spark, run PageRank and then put data back
into neo4j. It seems that this took > 2h for a 125m edge graph. The main
bottlenecks appear to be (1) reading the data as an RDD => this had to be
performed into small batches to avoid OOM errors and (2) PageRank
computation itself, which seems weird to me.

We decided to experiment with neo4j HTTP and Flink and we'll report back
when we have some results.

In the meantime, if you have any ideas on how we could speed up reading
from neo4j or any suggestion on approaches that I haven't mentioned, please
feel free to reply to this e-mail or add your comment in the shared
document.

Cheers,
-Vasia.

[1]:
https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
[2]: https://github.com/kbastani/neo4j-mazerunner
[3]: https://gist.github.com/jexp/0dfad34d49a16000e804
Reply | Threaded
Open this post in threaded view
|

Fwd: neo4j - Flink connector

Vasiliki Kalavri
Forwarding these here to keep dev@ in the loop :)


---------- Forwarded message ----------
From: Martin Junghanns <[hidden email]>
Date: 29 October 2015 at 18:37
Subject: Re: neo4j - Flink connector
To: Martin Liesenberg <[hidden email]>, Vasia Kalavri <
[hidden email]>
Cc: Alexander Keller <[hidden email]>, Martin Neumann <[hidden email]
>


My idea was to start with a (non-parallel) Neo4jInputFormat for Flink and
see how this REST endpoint works with streaming. As Cypher query results
are rows in the end, this should work well with Flink Tuples (similar to
our JDBCInputFormat).

I'll keep you updated!

Best,
Martin


On 29.10.2015 17:00, Martin Liesenberg wrote:

Also, if you need any help with the implementation, I'd be glad to chime
in.

Best regards
Martin

Martin Liesenberg <[hidden email]> schrieb am Do., 29. Okt.
2015 15:07:

> While using neo4j for a small project at work I came across the feature of
> streaming the results from neo4j with the REST API [1]. we didnt end up
> using it, so I can't comment on performance etc., but intuitively it seems
> like a better chunking.
>
> Nice to see another connector for Flink. :)
>
> Best regards,
>
> Martin
>
> [1] http://neo4j.com/docs/stable/rest-api-streaming.html
>
> Vasiliki Kalavri < <[hidden email]>[hidden email]>
> schrieb am Do., 29. Okt. 2015 um 14:51 Uhr:
>
>> Hello everyone,
>>
>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
>> (FLINK-2941) containing an initial document [1], but we'd also like to
>> share our ideas here to engage the community and get your feedback.
>>
>> We've had a skype call today and I will try to summarize some of the key
>> points here. The main use-cases we see are the following:
>>
>> - Use Flink for ETL / creating the graph and then insert it to a graph
>> database, like neo4j, for querying and search.
>> - Query neo4j on some topic or search the graph for patterns and extract a
>> subgraph, on which we'd then like to run some iterative graph analysis
>> task. This is where Flink/Gelly can help, by complementing the querying
>> (neo4j) with efficient iterative computation.
>>
>> We all agreed that the main challenge is efficiently getting the data out
>> of neo4j and into Flink. There have been some attempts to do similar
>> things
>> with neo4j and Spark, but the performance results are not very promising:
>>
>> - Mazerunner [2] is using HDFS for communication. We think that's it's not
>> worth it going towards this direction, as dumping the neo4j database to
>> HDFS and then reading it back to Flink would probably be terribly slow.
>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>> interface to import data into Spark, run PageRank and then put data back
>> into neo4j. It seems that this took > 2h for a 125m edge graph. The main
>> bottlenecks appear to be (1) reading the data as an RDD => this had to be
>> performed into small batches to avoid OOM errors and (2) PageRank
>> computation itself, which seems weird to me.
>>
>> We decided to experiment with neo4j HTTP and Flink and we'll report back
>> when we have some results.
>>
>> In the meantime, if you have any ideas on how we could speed up reading
>> from neo4j or any suggestion on approaches that I haven't mentioned,
>> please
>> feel free to reply to this e-mail or add your comment in the shared
>> document.
>>
>> Cheers,
>> -Vasia.
>>
>> [1]:
>>
>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>> [2]: https://github.com/kbastani/neo4j-mazerunner
>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Martin Junghanns
In reply to this post by Vasiliki Kalavri
Hi,

I wanted to give you a little update. I created a non-parallel
InputFormat which reads Cypher results from Neo4j into Tuples [1].
It can be used like the JDBCInputFormat:

String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";

Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
Neo4jInputFormat.buildNeo4jInputFormat()
       .setRestURI(restURI)
       .setCypherQuery(q)
       .setUsername("neo4j")
       .setPassword("test")
       .setConnectTimeout(1000)
       .setReadTimeout(1000)
       .finish();

Atm, to run the tests, a Neo4j instance needs to be up and running.
I tried to get neo4j-harness [2] into the project, but there are some
dependency conflicts which I need to figure out.

I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
running on the same machine. My dataset is the polish wiki dump [2]
which consists of 430,602 pages and 2,727,302 links. The protocol is:

1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
3) Create Gelly graph from Tuple3, init vertex values to 1.0
4) Run PageRank with beta=0.85 and 5 iterations

This takes about 22 seconds on my machine which is very promising.

Next steps are:
- OutputFormat
- Better Unit tests (integrate neo4j-harness)
- Bigger graphs :)

Any ideas and suggestions are of course highly appreciated :)

Best,
Martin


[1] https://github.com/s1ck/flink-neo4j
[2] http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
[3] plwiktionary-20151002-pages-articles-multistream.xml.bz2


On 29.10.2015 14:51, Vasiliki Kalavri wrote:

> Hello everyone,
>
> Martin, Martin, Alex (cc'ed) and myself have started discussing about
> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
> (FLINK-2941) containing an initial document [1], but we'd also like to
> share our ideas here to engage the community and get your feedback.
>
> We've had a skype call today and I will try to summarize some of the key
> points here. The main use-cases we see are the following:
>
> - Use Flink for ETL / creating the graph and then insert it to a graph
> database, like neo4j, for querying and search.
> - Query neo4j on some topic or search the graph for patterns and extract a
> subgraph, on which we'd then like to run some iterative graph analysis
> task. This is where Flink/Gelly can help, by complementing the querying
> (neo4j) with efficient iterative computation.
>
> We all agreed that the main challenge is efficiently getting the data out
> of neo4j and into Flink. There have been some attempts to do similar things
> with neo4j and Spark, but the performance results are not very promising:
>
> - Mazerunner [2] is using HDFS for communication. We think that's it's not
> worth it going towards this direction, as dumping the neo4j database to
> HDFS and then reading it back to Flink would probably be terribly slow.
> - In [3], you can see Michael Hunger's findings on using neo's HTTP
> interface to import data into Spark, run PageRank and then put data back
> into neo4j. It seems that this took > 2h for a 125m edge graph. The main
> bottlenecks appear to be (1) reading the data as an RDD => this had to be
> performed into small batches to avoid OOM errors and (2) PageRank
> computation itself, which seems weird to me.
>
> We decided to experiment with neo4j HTTP and Flink and we'll report back
> when we have some results.
>
> In the meantime, if you have any ideas on how we could speed up reading
> from neo4j or any suggestion on approaches that I haven't mentioned, please
> feel free to reply to this e-mail or add your comment in the shared
> document.
>
> Cheers,
> -Vasia.
>
> [1]:
> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
> [2]: https://github.com/kbastani/neo4j-mazerunner
> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Stephan Ewen
Wow, very nice results :-)

This input format alone is probably a very useful contribution, so I would
open a contribution there once you manage to get a few tests running.

I know little about neo4j, is there a way to read cypher query results in
parallel? (most systems do not expose such an interface, but maybe neo4j is
special there).

I recall at some point in time Martin Neumann asking about a way to create
dense contiguous unique IDs for creating graphs that can be bulk-imported
into neo4j. There is code for that in the data set utils, this may be
valuable for an output format.

On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <[hidden email]>
wrote:

> Hi,
>
> I wanted to give you a little update. I created a non-parallel
> InputFormat which reads Cypher results from Neo4j into Tuples [1].
> It can be used like the JDBCInputFormat:
>
> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>
> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
> Neo4jInputFormat.buildNeo4jInputFormat()
>       .setRestURI(restURI)
>       .setCypherQuery(q)
>       .setUsername("neo4j")
>       .setPassword("test")
>       .setConnectTimeout(1000)
>       .setReadTimeout(1000)
>       .finish();
>
> Atm, to run the tests, a Neo4j instance needs to be up and running.
> I tried to get neo4j-harness [2] into the project, but there are some
> dependency conflicts which I need to figure out.
>
> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
> running on the same machine. My dataset is the polish wiki dump [2]
> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>
> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
> 4) Run PageRank with beta=0.85 and 5 iterations
>
> This takes about 22 seconds on my machine which is very promising.
>
> Next steps are:
> - OutputFormat
> - Better Unit tests (integrate neo4j-harness)
> - Bigger graphs :)
>
> Any ideas and suggestions are of course highly appreciated :)
>
> Best,
> Martin
>
>
> [1] https://github.com/s1ck/flink-neo4j
> [2] http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>
>
>
> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>
>> Hello everyone,
>>
>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
>> (FLINK-2941) containing an initial document [1], but we'd also like to
>> share our ideas here to engage the community and get your feedback.
>>
>> We've had a skype call today and I will try to summarize some of the key
>> points here. The main use-cases we see are the following:
>>
>> - Use Flink for ETL / creating the graph and then insert it to a graph
>> database, like neo4j, for querying and search.
>> - Query neo4j on some topic or search the graph for patterns and extract a
>> subgraph, on which we'd then like to run some iterative graph analysis
>> task. This is where Flink/Gelly can help, by complementing the querying
>> (neo4j) with efficient iterative computation.
>>
>> We all agreed that the main challenge is efficiently getting the data out
>> of neo4j and into Flink. There have been some attempts to do similar
>> things
>> with neo4j and Spark, but the performance results are not very promising:
>>
>> - Mazerunner [2] is using HDFS for communication. We think that's it's not
>> worth it going towards this direction, as dumping the neo4j database to
>> HDFS and then reading it back to Flink would probably be terribly slow.
>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>> interface to import data into Spark, run PageRank and then put data back
>> into neo4j. It seems that this took > 2h for a 125m edge graph. The main
>> bottlenecks appear to be (1) reading the data as an RDD => this had to be
>> performed into small batches to avoid OOM errors and (2) PageRank
>> computation itself, which seems weird to me.
>>
>> We decided to experiment with neo4j HTTP and Flink and we'll report back
>> when we have some results.
>>
>> In the meantime, if you have any ideas on how we could speed up reading
>> from neo4j or any suggestion on approaches that I haven't mentioned,
>> please
>> feel free to reply to this e-mail or add your comment in the shared
>> document.
>>
>> Cheers,
>> -Vasia.
>>
>> [1]:
>>
>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>> [2]: https://github.com/kbastani/neo4j-mazerunner
>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Martin Junghanns
Hi,

I could need your input on testing the input format with Flink.

As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
for unit testing server extensions / REST applications. The problem here
is that the dependencies of Flink conflict with the dependencies of
neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
combination could run using the maven exclude mechanism, but no success.

So I thought about workarounds:

(1) Michael Hunger (neo4j) started a project and invited me to
contribute [1]. What it does during tests is:
- download a neo4j-<version>.tar.gz into a temp folder
- extract and start a neo4j instance
- run tests
- stop and discard neo4j

I like the concept, but I guess the problem is that it runs outside of
maven and I guess downloading from external resources (especially in
travis-ci) could lead to problems.

(2) I had a look into the other input formats. flink-hbase uses examples
instead of unit tests. This could be an option as long as there is no
clean solution for "real" unit testing.

What do you think?

Cheers, Martin


[1] https://github.com/jexp/neo4j-starter


On 03.11.2015 01:18, Stephan Ewen wrote:

> Wow, very nice results :-)
>
> This input format alone is probably a very useful contribution, so I would
> open a contribution there once you manage to get a few tests running.
>
> I know little about neo4j, is there a way to read cypher query results in
> parallel? (most systems do not expose such an interface, but maybe neo4j is
> special there).
>
> I recall at some point in time Martin Neumann asking about a way to create
> dense contiguous unique IDs for creating graphs that can be bulk-imported
> into neo4j. There is code for that in the data set utils, this may be
> valuable for an output format.
>
> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <[hidden email]>
> wrote:
>
>> Hi,
>>
>> I wanted to give you a little update. I created a non-parallel
>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>> It can be used like the JDBCInputFormat:
>>
>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>
>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>> Neo4jInputFormat.buildNeo4jInputFormat()
>>       .setRestURI(restURI)
>>       .setCypherQuery(q)
>>       .setUsername("neo4j")
>>       .setPassword("test")
>>       .setConnectTimeout(1000)
>>       .setReadTimeout(1000)
>>       .finish();
>>
>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>> I tried to get neo4j-harness [2] into the project, but there are some
>> dependency conflicts which I need to figure out.
>>
>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>> running on the same machine. My dataset is the polish wiki dump [2]
>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>
>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>> 4) Run PageRank with beta=0.85 and 5 iterations
>>
>> This takes about 22 seconds on my machine which is very promising.
>>
>> Next steps are:
>> - OutputFormat
>> - Better Unit tests (integrate neo4j-harness)
>> - Bigger graphs :)
>>
>> Any ideas and suggestions are of course highly appreciated :)
>>
>> Best,
>> Martin
>>
>>
>> [1] https://github.com/s1ck/flink-neo4j
>> [2] http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>
>>
>>
>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>
>>> Hello everyone,
>>>
>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
>>> (FLINK-2941) containing an initial document [1], but we'd also like to
>>> share our ideas here to engage the community and get your feedback.
>>>
>>> We've had a skype call today and I will try to summarize some of the key
>>> points here. The main use-cases we see are the following:
>>>
>>> - Use Flink for ETL / creating the graph and then insert it to a graph
>>> database, like neo4j, for querying and search.
>>> - Query neo4j on some topic or search the graph for patterns and extract a
>>> subgraph, on which we'd then like to run some iterative graph analysis
>>> task. This is where Flink/Gelly can help, by complementing the querying
>>> (neo4j) with efficient iterative computation.
>>>
>>> We all agreed that the main challenge is efficiently getting the data out
>>> of neo4j and into Flink. There have been some attempts to do similar
>>> things
>>> with neo4j and Spark, but the performance results are not very promising:
>>>
>>> - Mazerunner [2] is using HDFS for communication. We think that's it's not
>>> worth it going towards this direction, as dumping the neo4j database to
>>> HDFS and then reading it back to Flink would probably be terribly slow.
>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>> interface to import data into Spark, run PageRank and then put data back
>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The main
>>> bottlenecks appear to be (1) reading the data as an RDD => this had to be
>>> performed into small batches to avoid OOM errors and (2) PageRank
>>> computation itself, which seems weird to me.
>>>
>>> We decided to experiment with neo4j HTTP and Flink and we'll report back
>>> when we have some results.
>>>
>>> In the meantime, if you have any ideas on how we could speed up reading
>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>> please
>>> feel free to reply to this e-mail or add your comment in the shared
>>> document.
>>>
>>> Cheers,
>>> -Vasia.
>>>
>>> [1]:
>>>
>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Robert Metzger
Hi Martin,

what exactly were the issues you were facing with the dependency conflicts?

There is a way around these issues, and its called shading:
https://maven.apache.org/plugins/maven-shade-plugin/
In Flink we have several shaded modules (curator, hadoop) .. we could add a
neo4j-harness-shaded module which relocates conflicting dependencies into a
different namespace. That way, you can execute different versions of the
same library (jetty, scala) at the same time.
Since the module contains module would only contain dependencies needed at
test time, we could exclude it from releases.

Regarding Scala, it would be fine to execute the neo4j tests only with
scala 2.11 builds. Its not hard to control this using maven build profiles.
Do you really need jetty? Is neo4j starting the web interface also for the
tests?

Regards,
Robert


On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns <[hidden email]>
wrote:

> Hi,
>
> I could need your input on testing the input format with Flink.
>
> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
> for unit testing server extensions / REST applications. The problem here
> is that the dependencies of Flink conflict with the dependencies of
> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
> combination could run using the maven exclude mechanism, but no success.
>
> So I thought about workarounds:
>
> (1) Michael Hunger (neo4j) started a project and invited me to
> contribute [1]. What it does during tests is:
> - download a neo4j-<version>.tar.gz into a temp folder
> - extract and start a neo4j instance
> - run tests
> - stop and discard neo4j
>
> I like the concept, but I guess the problem is that it runs outside of
> maven and I guess downloading from external resources (especially in
> travis-ci) could lead to problems.
>
> (2) I had a look into the other input formats. flink-hbase uses examples
> instead of unit tests. This could be an option as long as there is no
> clean solution for "real" unit testing.
>
> What do you think?
>
> Cheers, Martin
>
>
> [1] https://github.com/jexp/neo4j-starter
>
>
> On 03.11.2015 01:18, Stephan Ewen wrote:
> > Wow, very nice results :-)
> >
> > This input format alone is probably a very useful contribution, so I
> would
> > open a contribution there once you manage to get a few tests running.
> >
> > I know little about neo4j, is there a way to read cypher query results in
> > parallel? (most systems do not expose such an interface, but maybe neo4j
> is
> > special there).
> >
> > I recall at some point in time Martin Neumann asking about a way to
> create
> > dense contiguous unique IDs for creating graphs that can be bulk-imported
> > into neo4j. There is code for that in the data set utils, this may be
> > valuable for an output format.
> >
> > On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
> [hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> I wanted to give you a little update. I created a non-parallel
> >> InputFormat which reads Cypher results from Neo4j into Tuples [1].
> >> It can be used like the JDBCInputFormat:
> >>
> >> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
> >>
> >> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
> >> Neo4jInputFormat.buildNeo4jInputFormat()
> >>       .setRestURI(restURI)
> >>       .setCypherQuery(q)
> >>       .setUsername("neo4j")
> >>       .setPassword("test")
> >>       .setConnectTimeout(1000)
> >>       .setReadTimeout(1000)
> >>       .finish();
> >>
> >> Atm, to run the tests, a Neo4j instance needs to be up and running.
> >> I tried to get neo4j-harness [2] into the project, but there are some
> >> dependency conflicts which I need to figure out.
> >>
> >> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
> >> running on the same machine. My dataset is the polish wiki dump [2]
> >> which consists of 430,602 pages and 2,727,302 links. The protocol is:
> >>
> >> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
> >> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
> >> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
> >> 4) Run PageRank with beta=0.85 and 5 iterations
> >>
> >> This takes about 22 seconds on my machine which is very promising.
> >>
> >> Next steps are:
> >> - OutputFormat
> >> - Better Unit tests (integrate neo4j-harness)
> >> - Bigger graphs :)
> >>
> >> Any ideas and suggestions are of course highly appreciated :)
> >>
> >> Best,
> >> Martin
> >>
> >>
> >> [1] https://github.com/s1ck/flink-neo4j
> >> [2]
> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
> >> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
> >>
> >>
> >>
> >> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
> >>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
> >>> (FLINK-2941) containing an initial document [1], but we'd also like to
> >>> share our ideas here to engage the community and get your feedback.
> >>>
> >>> We've had a skype call today and I will try to summarize some of the
> key
> >>> points here. The main use-cases we see are the following:
> >>>
> >>> - Use Flink for ETL / creating the graph and then insert it to a graph
> >>> database, like neo4j, for querying and search.
> >>> - Query neo4j on some topic or search the graph for patterns and
> extract a
> >>> subgraph, on which we'd then like to run some iterative graph analysis
> >>> task. This is where Flink/Gelly can help, by complementing the querying
> >>> (neo4j) with efficient iterative computation.
> >>>
> >>> We all agreed that the main challenge is efficiently getting the data
> out
> >>> of neo4j and into Flink. There have been some attempts to do similar
> >>> things
> >>> with neo4j and Spark, but the performance results are not very
> promising:
> >>>
> >>> - Mazerunner [2] is using HDFS for communication. We think that's it's
> not
> >>> worth it going towards this direction, as dumping the neo4j database to
> >>> HDFS and then reading it back to Flink would probably be terribly slow.
> >>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
> >>> interface to import data into Spark, run PageRank and then put data
> back
> >>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
> main
> >>> bottlenecks appear to be (1) reading the data as an RDD => this had to
> be
> >>> performed into small batches to avoid OOM errors and (2) PageRank
> >>> computation itself, which seems weird to me.
> >>>
> >>> We decided to experiment with neo4j HTTP and Flink and we'll report
> back
> >>> when we have some results.
> >>>
> >>> In the meantime, if you have any ideas on how we could speed up reading
> >>> from neo4j or any suggestion on approaches that I haven't mentioned,
> >>> please
> >>> feel free to reply to this e-mail or add your comment in the shared
> >>> document.
> >>>
> >>> Cheers,
> >>> -Vasia.
> >>>
> >>> [1]:
> >>>
> >>>
> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
> >>> [2]: https://github.com/kbastani/neo4j-mazerunner
> >>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
> >>>
> >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Martin Junghanns
Hi Robert,

Thank you for the hints. I tried to narrow down the error:

Flink version: 0.10-SNAPSHOT
Neo4j version: 2.3.0

I start with two dependencies:
flink-java
flink-gelly

(1) Add neo4j-harness and run basic example from Neo4j [1]
Leads to:

java.lang.ClassNotFoundException: org.eclipse.jetty.server.ConnectionFactory

(2) I excluded jetty-server from flink-java and flink-gelly
It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

leads to: java.lang.NoSuchMethodError:
org.eclipse.jetty.servlet.ServletContextHandler.<init>

(3) I excluded jetty-servlet from flink-java and flink-gelly
It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:

java.lang.NoSuchMethodError: scala.Predef$.$conforms()

(4) I excluded scala-library from flink-java and flink-gelly
It now uses scala-library:2.11.7 (was 2.10.4)

Now, the basic Neo4j example (without Flink runs).

Next, I added Flink to the mix and wrote a simple test using
neo4j-harness features, ExecutionEnvironment and my InputFormat.
Leads to:

java.lang.NoSuchMethodError:
scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
        at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
        at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
        at akka.actor.RootActorPath.$div(ActorPath.scala:159)
        at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
        at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
        at scala.util.Try$.apply(Try.scala:192)
        at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
        at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
        at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
        at scala.util.Success.flatMap(Try.scala:231)
        at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
        at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
        at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
        at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
        at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Range.foreach(Range.scala:166)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
        at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
        at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
        at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
        at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
        at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
        at
org.apache.flink.api.java.io.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)

This is where I don't know what to exclude next. Seems that some
components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
2.11.7.

How can I make use of the maven shade plugin in that case?

Again, thank you!

Cheers,
Martin

[1]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html 
(testMyExtensionWithFunctionFixture())


On 06.11.2015 16:17, Robert Metzger wrote:

> Hi Martin,
>
> what exactly were the issues you were facing with the dependency conflicts?
>
> There is a way around these issues, and its called shading:
> https://maven.apache.org/plugins/maven-shade-plugin/
> In Flink we have several shaded modules (curator, hadoop) .. we could add a
> neo4j-harness-shaded module which relocates conflicting dependencies into a
> different namespace. That way, you can execute different versions of the
> same library (jetty, scala) at the same time.
> Since the module contains module would only contain dependencies needed at
> test time, we could exclude it from releases.
>
> Regarding Scala, it would be fine to execute the neo4j tests only with
> scala 2.11 builds. Its not hard to control this using maven build profiles.
> Do you really need jetty? Is neo4j starting the web interface also for the
> tests?
>
> Regards,
> Robert
>
>
> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns <[hidden email]>
> wrote:
>
>> Hi,
>>
>> I could need your input on testing the input format with Flink.
>>
>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
>> for unit testing server extensions / REST applications. The problem here
>> is that the dependencies of Flink conflict with the dependencies of
>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>> combination could run using the maven exclude mechanism, but no success.
>>
>> So I thought about workarounds:
>>
>> (1) Michael Hunger (neo4j) started a project and invited me to
>> contribute [1]. What it does during tests is:
>> - download a neo4j-<version>.tar.gz into a temp folder
>> - extract and start a neo4j instance
>> - run tests
>> - stop and discard neo4j
>>
>> I like the concept, but I guess the problem is that it runs outside of
>> maven and I guess downloading from external resources (especially in
>> travis-ci) could lead to problems.
>>
>> (2) I had a look into the other input formats. flink-hbase uses examples
>> instead of unit tests. This could be an option as long as there is no
>> clean solution for "real" unit testing.
>>
>> What do you think?
>>
>> Cheers, Martin
>>
>>
>> [1] https://github.com/jexp/neo4j-starter
>>
>>
>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>> Wow, very nice results :-)
>>>
>>> This input format alone is probably a very useful contribution, so I
>> would
>>> open a contribution there once you manage to get a few tests running.
>>>
>>> I know little about neo4j, is there a way to read cypher query results in
>>> parallel? (most systems do not expose such an interface, but maybe neo4j
>> is
>>> special there).
>>>
>>> I recall at some point in time Martin Neumann asking about a way to
>> create
>>> dense contiguous unique IDs for creating graphs that can be bulk-imported
>>> into neo4j. There is code for that in the data set utils, this may be
>>> valuable for an output format.
>>>
>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>> [hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I wanted to give you a little update. I created a non-parallel
>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>> It can be used like the JDBCInputFormat:
>>>>
>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>
>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>        .setRestURI(restURI)
>>>>        .setCypherQuery(q)
>>>>        .setUsername("neo4j")
>>>>        .setPassword("test")
>>>>        .setConnectTimeout(1000)
>>>>        .setReadTimeout(1000)
>>>>        .finish();
>>>>
>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>> I tried to get neo4j-harness [2] into the project, but there are some
>>>> dependency conflicts which I need to figure out.
>>>>
>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>>>
>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>
>>>> This takes about 22 seconds on my machine which is very promising.
>>>>
>>>> Next steps are:
>>>> - OutputFormat
>>>> - Better Unit tests (integrate neo4j-harness)
>>>> - Bigger graphs :)
>>>>
>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> [1] https://github.com/s1ck/flink-neo4j
>>>> [2]
>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>
>>>>
>>>>
>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>>>> implementing a neo4j-Flink connector. I've opened a corresponding JIRA
>>>>> (FLINK-2941) containing an initial document [1], but we'd also like to
>>>>> share our ideas here to engage the community and get your feedback.
>>>>>
>>>>> We've had a skype call today and I will try to summarize some of the
>> key
>>>>> points here. The main use-cases we see are the following:
>>>>>
>>>>> - Use Flink for ETL / creating the graph and then insert it to a graph
>>>>> database, like neo4j, for querying and search.
>>>>> - Query neo4j on some topic or search the graph for patterns and
>> extract a
>>>>> subgraph, on which we'd then like to run some iterative graph analysis
>>>>> task. This is where Flink/Gelly can help, by complementing the querying
>>>>> (neo4j) with efficient iterative computation.
>>>>>
>>>>> We all agreed that the main challenge is efficiently getting the data
>> out
>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>> things
>>>>> with neo4j and Spark, but the performance results are not very
>> promising:
>>>>>
>>>>> - Mazerunner [2] is using HDFS for communication. We think that's it's
>> not
>>>>> worth it going towards this direction, as dumping the neo4j database to
>>>>> HDFS and then reading it back to Flink would probably be terribly slow.
>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>> interface to import data into Spark, run PageRank and then put data
>> back
>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>> main
>>>>> bottlenecks appear to be (1) reading the data as an RDD => this had to
>> be
>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>> computation itself, which seems weird to me.
>>>>>
>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>> back
>>>>> when we have some results.
>>>>>
>>>>> In the meantime, if you have any ideas on how we could speed up reading
>>>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>>>> please
>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>> document.
>>>>>
>>>>> Cheers,
>>>>> -Vasia.
>>>>>
>>>>> [1]:
>>>>>
>>>>>
>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>
>>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Martin Junghanns
Hi,

I am a bit stuck with that dependency problem. Any help would be
appreciated as I would like to continue working on the formats. Thanks!

Best,
Martin

On 07.11.2015 17:28, Martin Junghanns wrote:

> Hi Robert,
>
> Thank you for the hints. I tried to narrow down the error:
>
> Flink version: 0.10-SNAPSHOT
> Neo4j version: 2.3.0
>
> I start with two dependencies:
> flink-java
> flink-gelly
>
> (1) Add neo4j-harness and run basic example from Neo4j [1]
> Leads to:
>
> java.lang.ClassNotFoundException:
> org.eclipse.jetty.server.ConnectionFactory
>
> (2) I excluded jetty-server from flink-java and flink-gelly
> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
> Leads to:
>
> leads to: java.lang.NoSuchMethodError:
> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>
> (3) I excluded jetty-servlet from flink-java and flink-gelly
> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
> Leads to:
>
> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>
> (4) I excluded scala-library from flink-java and flink-gelly
> It now uses scala-library:2.11.7 (was 2.10.4)
>
> Now, the basic Neo4j example (without Flink runs).
>
> Next, I added Flink to the mix and wrote a simple test using
> neo4j-harness features, ExecutionEnvironment and my InputFormat.
> Leads to:
>
> java.lang.NoSuchMethodError:
> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>
>      at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>      at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>      at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>      at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>      at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>      at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>
>      at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
>      at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>      at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>
>      at scala.util.Try$.apply(Try.scala:192)
>      at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>
>      at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>
>      at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>
>      at scala.util.Success.flatMap(Try.scala:231)
>      at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>
>      at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>      at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>      at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>      at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>      at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>      at
> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>
>      at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>
>      at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>
>      at scala.collection.immutable.Range.foreach(Range.scala:166)
>      at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>
>      at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>
>      at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>      at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>      at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>
>      at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>
>      at
> org.apache.flink.api.java.io.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>
>
> This is where I don't know what to exclude next. Seems that some
> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
> 2.11.7.
>
> How can I make use of the maven shade plugin in that case?
>
> Again, thank you!
>
> Cheers,
> Martin
>
> [1]
> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
> (testMyExtensionWithFunctionFixture())
>
>
> On 06.11.2015 16:17, Robert Metzger wrote:
>> Hi Martin,
>>
>> what exactly were the issues you were facing with the dependency
>> conflicts?
>>
>> There is a way around these issues, and its called shading:
>> https://maven.apache.org/plugins/maven-shade-plugin/
>> In Flink we have several shaded modules (curator, hadoop) .. we could
>> add a
>> neo4j-harness-shaded module which relocates conflicting dependencies
>> into a
>> different namespace. That way, you can execute different versions of the
>> same library (jetty, scala) at the same time.
>> Since the module contains module would only contain dependencies
>> needed at
>> test time, we could exclude it from releases.
>>
>> Regarding Scala, it would be fine to execute the neo4j tests only with
>> scala 2.11 builds. Its not hard to control this using maven build
>> profiles.
>> Do you really need jetty? Is neo4j starting the web interface also for
>> the
>> tests?
>>
>> Regards,
>> Robert
>>
>>
>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>> <[hidden email]>
>> wrote:
>>
>>> Hi,
>>>
>>> I could need your input on testing the input format with Flink.
>>>
>>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
>>> for unit testing server extensions / REST applications. The problem here
>>> is that the dependencies of Flink conflict with the dependencies of
>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>>> combination could run using the maven exclude mechanism, but no success.
>>>
>>> So I thought about workarounds:
>>>
>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>> contribute [1]. What it does during tests is:
>>> - download a neo4j-<version>.tar.gz into a temp folder
>>> - extract and start a neo4j instance
>>> - run tests
>>> - stop and discard neo4j
>>>
>>> I like the concept, but I guess the problem is that it runs outside of
>>> maven and I guess downloading from external resources (especially in
>>> travis-ci) could lead to problems.
>>>
>>> (2) I had a look into the other input formats. flink-hbase uses examples
>>> instead of unit tests. This could be an option as long as there is no
>>> clean solution for "real" unit testing.
>>>
>>> What do you think?
>>>
>>> Cheers, Martin
>>>
>>>
>>> [1] https://github.com/jexp/neo4j-starter
>>>
>>>
>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>> Wow, very nice results :-)
>>>>
>>>> This input format alone is probably a very useful contribution, so I
>>> would
>>>> open a contribution there once you manage to get a few tests running.
>>>>
>>>> I know little about neo4j, is there a way to read cypher query
>>>> results in
>>>> parallel? (most systems do not expose such an interface, but maybe
>>>> neo4j
>>> is
>>>> special there).
>>>>
>>>> I recall at some point in time Martin Neumann asking about a way to
>>> create
>>>> dense contiguous unique IDs for creating graphs that can be
>>>> bulk-imported
>>>> into neo4j. There is code for that in the data set utils, this may be
>>>> valuable for an output format.
>>>>
>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>> [hidden email]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I wanted to give you a little update. I created a non-parallel
>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>>> It can be used like the JDBCInputFormat:
>>>>>
>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>>
>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>        .setRestURI(restURI)
>>>>>        .setCypherQuery(q)
>>>>>        .setUsername("neo4j")
>>>>>        .setPassword("test")
>>>>>        .setConnectTimeout(1000)
>>>>>        .setReadTimeout(1000)
>>>>>        .finish();
>>>>>
>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>>> I tried to get neo4j-harness [2] into the project, but there are some
>>>>> dependency conflicts which I need to figure out.
>>>>>
>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>>>>
>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>
>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>
>>>>> Next steps are:
>>>>> - OutputFormat
>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>> - Bigger graphs :)
>>>>>
>>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>>
>>>>> Best,
>>>>> Martin
>>>>>
>>>>>
>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>> [2]
>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>
>>>>>
>>>>>
>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>
>>>>>> Hello everyone,
>>>>>>
>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>> JIRA
>>>>>> (FLINK-2941) containing an initial document [1], but we'd also
>>>>>> like to
>>>>>> share our ideas here to engage the community and get your feedback.
>>>>>>
>>>>>> We've had a skype call today and I will try to summarize some of the
>>> key
>>>>>> points here. The main use-cases we see are the following:
>>>>>>
>>>>>> - Use Flink for ETL / creating the graph and then insert it to a
>>>>>> graph
>>>>>> database, like neo4j, for querying and search.
>>>>>> - Query neo4j on some topic or search the graph for patterns and
>>> extract a
>>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>> analysis
>>>>>> task. This is where Flink/Gelly can help, by complementing the
>>>>>> querying
>>>>>> (neo4j) with efficient iterative computation.
>>>>>>
>>>>>> We all agreed that the main challenge is efficiently getting the data
>>> out
>>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>> things
>>>>>> with neo4j and Spark, but the performance results are not very
>>> promising:
>>>>>>
>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>> it's
>>> not
>>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>> database to
>>>>>> HDFS and then reading it back to Flink would probably be terribly
>>>>>> slow.
>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>>> interface to import data into Spark, run PageRank and then put data
>>> back
>>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>>> main
>>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>> had to
>>> be
>>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>> computation itself, which seems weird to me.
>>>>>>
>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>>> back
>>>>>> when we have some results.
>>>>>>
>>>>>> In the meantime, if you have any ideas on how we could speed up
>>>>>> reading
>>>>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>>>>> please
>>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>>> document.
>>>>>>
>>>>>> Cheers,
>>>>>> -Vasia.
>>>>>>
>>>>>> [1]:
>>>>>>
>>>>>>
>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>
>>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>
>>>>>>
>>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Robert Metzger
Sorry for the delay.
So the plan of this work is to add a neo4j connector into Flink, right?

While looking at the pom files of neo4j I found that its GPLv3 licensed,
and Apache projects can not depend/link with GPL code [1].
So I we can not make the module part of the Flink source.
However, its actually quite easy to publish code into Maven central, so you
could release it on your own into Maven.
If that is too much work for you, I can also start a github project like
"flink-gpl" with access to maven central where we can release stuff like
this.

Is this repository [2] your current work in progress on the dependency
issue?
Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
build out. In this case, we could require Flink users to use the scala 2.11
build of Flink when they want to use neo4j.
I think I can help you much better as soon as I have your current pom file
+ code.

[1] http://www.apache.org/legal/resolved.html#category-a
[2] https://github.com/s1ck/flink-neo4j


On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <[hidden email]>
wrote:

> Hi,
>
> I am a bit stuck with that dependency problem. Any help would be
> appreciated as I would like to continue working on the formats. Thanks!
>
> Best,
> Martin
>
>
> On 07.11.2015 17:28, Martin Junghanns wrote:
>
>> Hi Robert,
>>
>> Thank you for the hints. I tried to narrow down the error:
>>
>> Flink version: 0.10-SNAPSHOT
>> Neo4j version: 2.3.0
>>
>> I start with two dependencies:
>> flink-java
>> flink-gelly
>>
>> (1) Add neo4j-harness and run basic example from Neo4j [1]
>> Leads to:
>>
>> java.lang.ClassNotFoundException:
>> org.eclipse.jetty.server.ConnectionFactory
>>
>> (2) I excluded jetty-server from flink-java and flink-gelly
>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
>> Leads to:
>>
>> leads to: java.lang.NoSuchMethodError:
>> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>>
>> (3) I excluded jetty-servlet from flink-java and flink-gelly
>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
>> Leads to:
>>
>> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>>
>> (4) I excluded scala-library from flink-java and flink-gelly
>> It now uses scala-library:2.11.7 (was 2.10.4)
>>
>> Now, the basic Neo4j example (without Flink runs).
>>
>> Next, I added Flink to the mix and wrote a simple test using
>> neo4j-harness features, ExecutionEnvironment and my InputFormat.
>> Leads to:
>>
>> java.lang.NoSuchMethodError:
>>
>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>>
>>      at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>>      at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>>      at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>>      at
>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>>      at
>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>>      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>      at
>>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>
>>      at
>>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>
>>      at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>      at
>>
>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>>
>>      at scala.util.Try$.apply(Try.scala:192)
>>      at
>>
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>>
>>      at
>>
>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>
>>      at
>>
>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>
>>      at scala.util.Success.flatMap(Try.scala:231)
>>      at
>>
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>>
>>      at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>>      at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>>      at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>>      at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>>      at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>>      at
>>
>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>>
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>>
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>>
>>      at
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>>
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>>
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>>
>>      at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>
>>      at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>
>>      at scala.collection.immutable.Range.foreach(Range.scala:166)
>>      at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>>
>>      at
>>
>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>>
>>      at
>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>>      at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>>      at
>>
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>>
>>      at
>>
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>>
>>      at
>> org.apache.flink.api.java.io
>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>>
>>
>> This is where I don't know what to exclude next. Seems that some
>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
>> 2.11.7.
>>
>> How can I make use of the maven shade plugin in that case?
>>
>> Again, thank you!
>>
>> Cheers,
>> Martin
>>
>> [1]
>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>> (testMyExtensionWithFunctionFixture())
>>
>>
>> On 06.11.2015 16:17, Robert Metzger wrote:
>>
>>> Hi Martin,
>>>
>>> what exactly were the issues you were facing with the dependency
>>> conflicts?
>>>
>>> There is a way around these issues, and its called shading:
>>> https://maven.apache.org/plugins/maven-shade-plugin/
>>> In Flink we have several shaded modules (curator, hadoop) .. we could
>>> add a
>>> neo4j-harness-shaded module which relocates conflicting dependencies
>>> into a
>>> different namespace. That way, you can execute different versions of the
>>> same library (jetty, scala) at the same time.
>>> Since the module contains module would only contain dependencies
>>> needed at
>>> test time, we could exclude it from releases.
>>>
>>> Regarding Scala, it would be fine to execute the neo4j tests only with
>>> scala 2.11 builds. Its not hard to control this using maven build
>>> profiles.
>>> Do you really need jetty? Is neo4j starting the web interface also for
>>> the
>>> tests?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>>> <[hidden email]>
>>> wrote:
>>>
>>> Hi,
>>>>
>>>> I could need your input on testing the input format with Flink.
>>>>
>>>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
>>>> for unit testing server extensions / REST applications. The problem here
>>>> is that the dependencies of Flink conflict with the dependencies of
>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>>>> combination could run using the maven exclude mechanism, but no success.
>>>>
>>>> So I thought about workarounds:
>>>>
>>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>>> contribute [1]. What it does during tests is:
>>>> - download a neo4j-<version>.tar.gz into a temp folder
>>>> - extract and start a neo4j instance
>>>> - run tests
>>>> - stop and discard neo4j
>>>>
>>>> I like the concept, but I guess the problem is that it runs outside of
>>>> maven and I guess downloading from external resources (especially in
>>>> travis-ci) could lead to problems.
>>>>
>>>> (2) I had a look into the other input formats. flink-hbase uses examples
>>>> instead of unit tests. This could be an option as long as there is no
>>>> clean solution for "real" unit testing.
>>>>
>>>> What do you think?
>>>>
>>>> Cheers, Martin
>>>>
>>>>
>>>> [1] https://github.com/jexp/neo4j-starter
>>>>
>>>>
>>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>>
>>>>> Wow, very nice results :-)
>>>>>
>>>>> This input format alone is probably a very useful contribution, so I
>>>>>
>>>> would
>>>>
>>>>> open a contribution there once you manage to get a few tests running.
>>>>>
>>>>> I know little about neo4j, is there a way to read cypher query
>>>>> results in
>>>>> parallel? (most systems do not expose such an interface, but maybe
>>>>> neo4j
>>>>>
>>>> is
>>>>
>>>>> special there).
>>>>>
>>>>> I recall at some point in time Martin Neumann asking about a way to
>>>>>
>>>> create
>>>>
>>>>> dense contiguous unique IDs for creating graphs that can be
>>>>> bulk-imported
>>>>> into neo4j. There is code for that in the data set utils, this may be
>>>>> valuable for an output format.
>>>>>
>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>>>>
>>>> [hidden email]>
>>>>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>>
>>>>>> I wanted to give you a little update. I created a non-parallel
>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>>>> It can be used like the JDBCInputFormat:
>>>>>>
>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>>>
>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>>        .setRestURI(restURI)
>>>>>>        .setCypherQuery(q)
>>>>>>        .setUsername("neo4j")
>>>>>>        .setPassword("test")
>>>>>>        .setConnectTimeout(1000)
>>>>>>        .setReadTimeout(1000)
>>>>>>        .finish();
>>>>>>
>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>>>> I tried to get neo4j-harness [2] into the project, but there are some
>>>>>> dependency conflicts which I need to figure out.
>>>>>>
>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>>>>>
>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>>
>>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>>
>>>>>> Next steps are:
>>>>>> - OutputFormat
>>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>>> - Bigger graphs :)
>>>>>>
>>>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>>>
>>>>>> Best,
>>>>>> Martin
>>>>>>
>>>>>>
>>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>>> [2]
>>>>>>
>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>
>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>>
>>>>>> Hello everyone,
>>>>>>>
>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>>> JIRA
>>>>>>> (FLINK-2941) containing an initial document [1], but we'd also
>>>>>>> like to
>>>>>>> share our ideas here to engage the community and get your feedback.
>>>>>>>
>>>>>>> We've had a skype call today and I will try to summarize some of the
>>>>>>>
>>>>>> key
>>>>
>>>>> points here. The main use-cases we see are the following:
>>>>>>>
>>>>>>> - Use Flink for ETL / creating the graph and then insert it to a
>>>>>>> graph
>>>>>>> database, like neo4j, for querying and search.
>>>>>>> - Query neo4j on some topic or search the graph for patterns and
>>>>>>>
>>>>>> extract a
>>>>
>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>>> analysis
>>>>>>> task. This is where Flink/Gelly can help, by complementing the
>>>>>>> querying
>>>>>>> (neo4j) with efficient iterative computation.
>>>>>>>
>>>>>>> We all agreed that the main challenge is efficiently getting the data
>>>>>>>
>>>>>> out
>>>>
>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>>> things
>>>>>>> with neo4j and Spark, but the performance results are not very
>>>>>>>
>>>>>> promising:
>>>>
>>>>>
>>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>>> it's
>>>>>>>
>>>>>> not
>>>>
>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>>> database to
>>>>>>> HDFS and then reading it back to Flink would probably be terribly
>>>>>>> slow.
>>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>>>> interface to import data into Spark, run PageRank and then put data
>>>>>>>
>>>>>> back
>>>>
>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>>>>>>>
>>>>>> main
>>>>
>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>>> had to
>>>>>>>
>>>>>> be
>>>>
>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>>> computation itself, which seems weird to me.
>>>>>>>
>>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>>>>>>>
>>>>>> back
>>>>
>>>>> when we have some results.
>>>>>>>
>>>>>>> In the meantime, if you have any ideas on how we could speed up
>>>>>>> reading
>>>>>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>>>>>> please
>>>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>>>> document.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Vasia.
>>>>>>>
>>>>>>> [1]:
>>>>>>>
>>>>>>>
>>>>>>>
>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>
>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Martin Junghanns
Hi Robert,

Thank you for the reply! At the moment we just "play" with Neo4j and
Flink but the InputFormat shall be available in Flink eventually.

Concerning the license: I did not think of that, but yes, I can make it
available in maven central. I just need to find out how to do this.

I created a branch that includes the dependency problem [1]. There is a
test case "neo4jOnly" [2] which does not use Flink and works fine in a
project where only neo4j-harness is included. However, when I add
flink-java and flink-gelly (excluding flink-clients because of jetty) to
the project, the neo4jOnly test fails with:

org.neo4j.server.ServerStartupException: Starting Neo4j failed:
com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction;

I compared the depedencies of the "clean" neo4j-harness project and made
sure the dependencies and versions are the same. ReflectionHelper is
part of jersey-core which is included.

This is really weird, because - as I wrote before - the simple neo4jOnly
test ran a few days ago. Were there any changes concerning dependencies
in 0.10-SNAPSHOT?
However, the next thing which would fail is caused by the scala-library
version conflict.

Again, thanks for your help.

Best,
Martin

[1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem
[2]
https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32

On 12.11.2015 12:51, Robert Metzger wrote:

> Sorry for the delay.
> So the plan of this work is to add a neo4j connector into Flink, right?
>
> While looking at the pom files of neo4j I found that its GPLv3 licensed,
> and Apache projects can not depend/link with GPL code [1].
> So I we can not make the module part of the Flink source.
> However, its actually quite easy to publish code into Maven central, so you
> could release it on your own into Maven.
> If that is too much work for you, I can also start a github project like
> "flink-gpl" with access to maven central where we can release stuff like
> this.
>
> Is this repository [2] your current work in progress on the dependency
> issue?
> Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
> build out. In this case, we could require Flink users to use the scala 2.11
> build of Flink when they want to use neo4j.
> I think I can help you much better as soon as I have your current pom file
> + code.
>
> [1] http://www.apache.org/legal/resolved.html#category-a
> [2] https://github.com/s1ck/flink-neo4j
>
>
> On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <[hidden email]>
> wrote:
>
>> Hi,
>>
>> I am a bit stuck with that dependency problem. Any help would be
>> appreciated as I would like to continue working on the formats. Thanks!
>>
>> Best,
>> Martin
>>
>>
>> On 07.11.2015 17:28, Martin Junghanns wrote:
>>
>>> Hi Robert,
>>>
>>> Thank you for the hints. I tried to narrow down the error:
>>>
>>> Flink version: 0.10-SNAPSHOT
>>> Neo4j version: 2.3.0
>>>
>>> I start with two dependencies:
>>> flink-java
>>> flink-gelly
>>>
>>> (1) Add neo4j-harness and run basic example from Neo4j [1]
>>> Leads to:
>>>
>>> java.lang.ClassNotFoundException:
>>> org.eclipse.jetty.server.ConnectionFactory
>>>
>>> (2) I excluded jetty-server from flink-java and flink-gelly
>>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
>>> Leads to:
>>>
>>> leads to: java.lang.NoSuchMethodError:
>>> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>>>
>>> (3) I excluded jetty-servlet from flink-java and flink-gelly
>>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
>>> Leads to:
>>>
>>> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>>>
>>> (4) I excluded scala-library from flink-java and flink-gelly
>>> It now uses scala-library:2.11.7 (was 2.10.4)
>>>
>>> Now, the basic Neo4j example (without Flink runs).
>>>
>>> Next, I added Flink to the mix and wrote a simple test using
>>> neo4j-harness features, ExecutionEnvironment and my InputFormat.
>>> Leads to:
>>>
>>> java.lang.NoSuchMethodError:
>>>
>>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>>>
>>>       at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>>>       at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>>>       at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>>>       at
>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>>>       at
>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>>>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>       at
>>>
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>
>>>       at
>>>
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>
>>>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>       at
>>>
>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>>>
>>>       at scala.util.Try$.apply(Try.scala:192)
>>>       at
>>>
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>>>
>>>       at
>>>
>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>
>>>       at
>>>
>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>
>>>       at scala.util.Success.flatMap(Try.scala:231)
>>>       at
>>>
>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>>>
>>>       at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>>>       at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>>>       at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>>>       at
>>>
>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>>>
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>>>
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>>>
>>>       at
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
>>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>>>
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>>>
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>>>
>>>       at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>
>>>       at
>>>
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>
>>>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>>>       at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>>>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>>>
>>>       at
>>>
>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>>>
>>>       at
>>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>>>       at
>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>>>       at
>>>
>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>>>
>>>       at
>>>
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>>>
>>>       at
>>> org.apache.flink.api.java.io
>>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>>>
>>>
>>> This is where I don't know what to exclude next. Seems that some
>>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
>>> 2.11.7.
>>>
>>> How can I make use of the maven shade plugin in that case?
>>>
>>> Again, thank you!
>>>
>>> Cheers,
>>> Martin
>>>
>>> [1]
>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>> (testMyExtensionWithFunctionFixture())
>>>
>>>
>>> On 06.11.2015 16:17, Robert Metzger wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> what exactly were the issues you were facing with the dependency
>>>> conflicts?
>>>>
>>>> There is a way around these issues, and its called shading:
>>>> https://maven.apache.org/plugins/maven-shade-plugin/
>>>> In Flink we have several shaded modules (curator, hadoop) .. we could
>>>> add a
>>>> neo4j-harness-shaded module which relocates conflicting dependencies
>>>> into a
>>>> different namespace. That way, you can execute different versions of the
>>>> same library (jetty, scala) at the same time.
>>>> Since the module contains module would only contain dependencies
>>>> needed at
>>>> test time, we could exclude it from releases.
>>>>
>>>> Regarding Scala, it would be fine to execute the neo4j tests only with
>>>> scala 2.11 builds. Its not hard to control this using maven build
>>>> profiles.
>>>> Do you really need jetty? Is neo4j starting the web interface also for
>>>> the
>>>> tests?
>>>>
>>>> Regards,
>>>> Robert
>>>>
>>>>
>>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>>>> <[hidden email]>
>>>> wrote:
>>>>
>>>> Hi,
>>>>>
>>>>> I could need your input on testing the input format with Flink.
>>>>>
>>>>> As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
>>>>> for unit testing server extensions / REST applications. The problem here
>>>>> is that the dependencies of Flink conflict with the dependencies of
>>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>>>>> combination could run using the maven exclude mechanism, but no success.
>>>>>
>>>>> So I thought about workarounds:
>>>>>
>>>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>>>> contribute [1]. What it does during tests is:
>>>>> - download a neo4j-<version>.tar.gz into a temp folder
>>>>> - extract and start a neo4j instance
>>>>> - run tests
>>>>> - stop and discard neo4j
>>>>>
>>>>> I like the concept, but I guess the problem is that it runs outside of
>>>>> maven and I guess downloading from external resources (especially in
>>>>> travis-ci) could lead to problems.
>>>>>
>>>>> (2) I had a look into the other input formats. flink-hbase uses examples
>>>>> instead of unit tests. This could be an option as long as there is no
>>>>> clean solution for "real" unit testing.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Cheers, Martin
>>>>>
>>>>>
>>>>> [1] https://github.com/jexp/neo4j-starter
>>>>>
>>>>>
>>>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>>>
>>>>>> Wow, very nice results :-)
>>>>>>
>>>>>> This input format alone is probably a very useful contribution, so I
>>>>>>
>>>>> would
>>>>>
>>>>>> open a contribution there once you manage to get a few tests running.
>>>>>>
>>>>>> I know little about neo4j, is there a way to read cypher query
>>>>>> results in
>>>>>> parallel? (most systems do not expose such an interface, but maybe
>>>>>> neo4j
>>>>>>
>>>>> is
>>>>>
>>>>>> special there).
>>>>>>
>>>>>> I recall at some point in time Martin Neumann asking about a way to
>>>>>>
>>>>> create
>>>>>
>>>>>> dense contiguous unique IDs for creating graphs that can be
>>>>>> bulk-imported
>>>>>> into neo4j. There is code for that in the data set utils, this may be
>>>>>> valuable for an output format.
>>>>>>
>>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>>>>>
>>>>> [hidden email]>
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>>
>>>>>>> I wanted to give you a little update. I created a non-parallel
>>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>>>>> It can be used like the JDBCInputFormat:
>>>>>>>
>>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>>>>
>>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>>>         .setRestURI(restURI)
>>>>>>>         .setCypherQuery(q)
>>>>>>>         .setUsername("neo4j")
>>>>>>>         .setPassword("test")
>>>>>>>         .setConnectTimeout(1000)
>>>>>>>         .setReadTimeout(1000)
>>>>>>>         .finish();
>>>>>>>
>>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>>>>> I tried to get neo4j-harness [2] into the project, but there are some
>>>>>>> dependency conflicts which I need to figure out.
>>>>>>>
>>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol is:
>>>>>>>
>>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
>>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>>>
>>>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>>>
>>>>>>> Next steps are:
>>>>>>> - OutputFormat
>>>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>>>> - Bigger graphs :)
>>>>>>>
>>>>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>>>>
>>>>>>> Best,
>>>>>>> Martin
>>>>>>>
>>>>>>>
>>>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>>>> [2]
>>>>>>>
>>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>>
>>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>>>
>>>>>>> Hello everyone,
>>>>>>>>
>>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing about
>>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>>>> JIRA
>>>>>>>> (FLINK-2941) containing an initial document [1], but we'd also
>>>>>>>> like to
>>>>>>>> share our ideas here to engage the community and get your feedback.
>>>>>>>>
>>>>>>>> We've had a skype call today and I will try to summarize some of the
>>>>>>>>
>>>>>>> key
>>>>>
>>>>>> points here. The main use-cases we see are the following:
>>>>>>>>
>>>>>>>> - Use Flink for ETL / creating the graph and then insert it to a
>>>>>>>> graph
>>>>>>>> database, like neo4j, for querying and search.
>>>>>>>> - Query neo4j on some topic or search the graph for patterns and
>>>>>>>>
>>>>>>> extract a
>>>>>
>>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>>>> analysis
>>>>>>>> task. This is where Flink/Gelly can help, by complementing the
>>>>>>>> querying
>>>>>>>> (neo4j) with efficient iterative computation.
>>>>>>>>
>>>>>>>> We all agreed that the main challenge is efficiently getting the data
>>>>>>>>
>>>>>>> out
>>>>>
>>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>>>> things
>>>>>>>> with neo4j and Spark, but the performance results are not very
>>>>>>>>
>>>>>>> promising:
>>>>>
>>>>>>
>>>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>>>> it's
>>>>>>>>
>>>>>>> not
>>>>>
>>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>>>> database to
>>>>>>>> HDFS and then reading it back to Flink would probably be terribly
>>>>>>>> slow.
>>>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>>>>> interface to import data into Spark, run PageRank and then put data
>>>>>>>>
>>>>>>> back
>>>>>
>>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>>>>>>>>
>>>>>>> main
>>>>>
>>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>>>> had to
>>>>>>>>
>>>>>>> be
>>>>>
>>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>>>> computation itself, which seems weird to me.
>>>>>>>>
>>>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>>>>>>>>
>>>>>>> back
>>>>>
>>>>>> when we have some results.
>>>>>>>>
>>>>>>>> In the meantime, if you have any ideas on how we could speed up
>>>>>>>> reading
>>>>>>>> from neo4j or any suggestion on approaches that I haven't mentioned,
>>>>>>>> please
>>>>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>>>>> document.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Vasia.
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>>
>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Robert Metzger
Have a look at my updated version of your code:
https://github.com/rmetzger/scratch/tree/dependency_problem
It now executes both tests, however, I was not able to get the second test
to pass. It seems that Neo4j's web server is returning a 500 status code
when open()'ing the connection.
I'm not sure how to debug this issue.

On Thu, Nov 12, 2015 at 2:19 PM, Martin Junghanns <[hidden email]>
wrote:

> Hi Robert,
>
> Thank you for the reply! At the moment we just "play" with Neo4j and Flink
> but the InputFormat shall be available in Flink eventually.
>
> Concerning the license: I did not think of that, but yes, I can make it
> available in maven central. I just need to find out how to do this.
>
> I created a branch that includes the dependency problem [1]. There is a
> test case "neo4jOnly" [2] which does not use Flink and works fine in a
> project where only neo4j-harness is included. However, when I add
> flink-java and flink-gelly (excluding flink-clients because of jetty) to
> the project, the neo4jOnly test fails with:
>
> org.neo4j.server.ServerStartupException: Starting Neo4j failed:
> com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction;
>
> I compared the depedencies of the "clean" neo4j-harness project and made
> sure the dependencies and versions are the same. ReflectionHelper is part
> of jersey-core which is included.
>
> This is really weird, because - as I wrote before - the simple neo4jOnly
> test ran a few days ago. Were there any changes concerning dependencies in
> 0.10-SNAPSHOT?
> However, the next thing which would fail is caused by the scala-library
> version conflict.
>
> Again, thanks for your help.
>
> Best,
> Martin
>
> [1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem
> [2]
> https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32
>
>
> On 12.11.2015 12:51, Robert Metzger wrote:
>
>> Sorry for the delay.
>> So the plan of this work is to add a neo4j connector into Flink, right?
>>
>> While looking at the pom files of neo4j I found that its GPLv3 licensed,
>> and Apache projects can not depend/link with GPL code [1].
>> So I we can not make the module part of the Flink source.
>> However, its actually quite easy to publish code into Maven central, so
>> you
>> could release it on your own into Maven.
>> If that is too much work for you, I can also start a github project like
>> "flink-gpl" with access to maven central where we can release stuff like
>> this.
>>
>> Is this repository [2] your current work in progress on the dependency
>> issue?
>> Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
>> build out. In this case, we could require Flink users to use the scala
>> 2.11
>> build of Flink when they want to use neo4j.
>> I think I can help you much better as soon as I have your current pom file
>> + code.
>>
>> [1] http://www.apache.org/legal/resolved.html#category-a
>> [2] https://github.com/s1ck/flink-neo4j
>>
>>
>> On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <
>> [hidden email]>
>> wrote:
>>
>> Hi,
>>>
>>> I am a bit stuck with that dependency problem. Any help would be
>>> appreciated as I would like to continue working on the formats. Thanks!
>>>
>>> Best,
>>> Martin
>>>
>>>
>>> On 07.11.2015 17:28, Martin Junghanns wrote:
>>>
>>> Hi Robert,
>>>>
>>>> Thank you for the hints. I tried to narrow down the error:
>>>>
>>>> Flink version: 0.10-SNAPSHOT
>>>> Neo4j version: 2.3.0
>>>>
>>>> I start with two dependencies:
>>>> flink-java
>>>> flink-gelly
>>>>
>>>> (1) Add neo4j-harness and run basic example from Neo4j [1]
>>>> Leads to:
>>>>
>>>> java.lang.ClassNotFoundException:
>>>> org.eclipse.jetty.server.ConnectionFactory
>>>>
>>>> (2) I excluded jetty-server from flink-java and flink-gelly
>>>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
>>>> Leads to:
>>>>
>>>> leads to: java.lang.NoSuchMethodError:
>>>> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>>>>
>>>> (3) I excluded jetty-servlet from flink-java and flink-gelly
>>>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
>>>> Leads to:
>>>>
>>>> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>>>>
>>>> (4) I excluded scala-library from flink-java and flink-gelly
>>>> It now uses scala-library:2.11.7 (was 2.10.4)
>>>>
>>>> Now, the basic Neo4j example (without Flink runs).
>>>>
>>>> Next, I added Flink to the mix and wrote a simple test using
>>>> neo4j-harness features, ExecutionEnvironment and my InputFormat.
>>>> Leads to:
>>>>
>>>> java.lang.NoSuchMethodError:
>>>>
>>>>
>>>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>>>>
>>>>       at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>>>>       at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>>>>       at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>>>>       at
>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>>>>       at
>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>>>>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method)
>>>>       at
>>>>
>>>>
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>
>>>>       at
>>>>
>>>>
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>
>>>>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>>>>
>>>>       at scala.util.Try$.apply(Try.scala:192)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>>>>
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>
>>>>       at scala.util.Success.flatMap(Try.scala:231)
>>>>       at
>>>>
>>>>
>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>>>>
>>>>       at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>>>>       at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>>>>       at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>>>>
>>>>       at
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
>>>>
>>>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>>>>
>>>>       at
>>>>
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>
>>>>       at
>>>>
>>>>
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>
>>>>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>>>>       at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>>>>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>>>>
>>>>       at
>>>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>>>>       at
>>>>
>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>>>>
>>>>       at
>>>>
>>>>
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>>>>
>>>>       at
>>>> org.apache.flink.api.java.io
>>>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>>>>
>>>>
>>>> This is where I don't know what to exclude next. Seems that some
>>>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
>>>> 2.11.7.
>>>>
>>>> How can I make use of the maven shade plugin in that case?
>>>>
>>>> Again, thank you!
>>>>
>>>> Cheers,
>>>> Martin
>>>>
>>>> [1]
>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>> (testMyExtensionWithFunctionFixture())
>>>>
>>>>
>>>> On 06.11.2015 16:17, Robert Metzger wrote:
>>>>
>>>> Hi Martin,
>>>>>
>>>>> what exactly were the issues you were facing with the dependency
>>>>> conflicts?
>>>>>
>>>>> There is a way around these issues, and its called shading:
>>>>> https://maven.apache.org/plugins/maven-shade-plugin/
>>>>> In Flink we have several shaded modules (curator, hadoop) .. we could
>>>>> add a
>>>>> neo4j-harness-shaded module which relocates conflicting dependencies
>>>>> into a
>>>>> different namespace. That way, you can execute different versions of
>>>>> the
>>>>> same library (jetty, scala) at the same time.
>>>>> Since the module contains module would only contain dependencies
>>>>> needed at
>>>>> test time, we could exclude it from releases.
>>>>>
>>>>> Regarding Scala, it would be fine to execute the neo4j tests only with
>>>>> scala 2.11 builds. Its not hard to control this using maven build
>>>>> profiles.
>>>>> Do you really need jetty? Is neo4j starting the web interface also for
>>>>> the
>>>>> tests?
>>>>>
>>>>> Regards,
>>>>> Robert
>>>>>
>>>>>
>>>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>>>>> <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>>
>>>>>> I could need your input on testing the input format with Flink.
>>>>>>
>>>>>> As I already mentioned, Neo4j offers a dedicated module
>>>>>> (neo4j-harness)
>>>>>> for unit testing server extensions / REST applications. The problem
>>>>>> here
>>>>>> is that the dependencies of Flink conflict with the dependencies of
>>>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>>>>>> combination could run using the maven exclude mechanism, but no
>>>>>> success.
>>>>>>
>>>>>> So I thought about workarounds:
>>>>>>
>>>>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>>>>> contribute [1]. What it does during tests is:
>>>>>> - download a neo4j-<version>.tar.gz into a temp folder
>>>>>> - extract and start a neo4j instance
>>>>>> - run tests
>>>>>> - stop and discard neo4j
>>>>>>
>>>>>> I like the concept, but I guess the problem is that it runs outside of
>>>>>> maven and I guess downloading from external resources (especially in
>>>>>> travis-ci) could lead to problems.
>>>>>>
>>>>>> (2) I had a look into the other input formats. flink-hbase uses
>>>>>> examples
>>>>>> instead of unit tests. This could be an option as long as there is no
>>>>>> clean solution for "real" unit testing.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Cheers, Martin
>>>>>>
>>>>>>
>>>>>> [1] https://github.com/jexp/neo4j-starter
>>>>>>
>>>>>>
>>>>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>>>>
>>>>>> Wow, very nice results :-)
>>>>>>>
>>>>>>> This input format alone is probably a very useful contribution, so I
>>>>>>>
>>>>>>> would
>>>>>>
>>>>>> open a contribution there once you manage to get a few tests running.
>>>>>>>
>>>>>>> I know little about neo4j, is there a way to read cypher query
>>>>>>> results in
>>>>>>> parallel? (most systems do not expose such an interface, but maybe
>>>>>>> neo4j
>>>>>>>
>>>>>>> is
>>>>>>
>>>>>> special there).
>>>>>>>
>>>>>>> I recall at some point in time Martin Neumann asking about a way to
>>>>>>>
>>>>>>> create
>>>>>>
>>>>>> dense contiguous unique IDs for creating graphs that can be
>>>>>>> bulk-imported
>>>>>>> into neo4j. There is code for that in the data set utils, this may be
>>>>>>> valuable for an output format.
>>>>>>>
>>>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>>>>>>
>>>>>>> [hidden email]>
>>>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>>
>>>>>>>> I wanted to give you a little update. I created a non-parallel
>>>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>>>>>> It can be used like the JDBCInputFormat:
>>>>>>>>
>>>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>>>>>
>>>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>>>>         .setRestURI(restURI)
>>>>>>>>         .setCypherQuery(q)
>>>>>>>>         .setUsername("neo4j")
>>>>>>>>         .setPassword("test")
>>>>>>>>         .setConnectTimeout(1000)
>>>>>>>>         .setReadTimeout(1000)
>>>>>>>>         .finish();
>>>>>>>>
>>>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>>>>>> I tried to get neo4j-harness [2] into the project, but there are
>>>>>>>> some
>>>>>>>> dependency conflicts which I need to figure out.
>>>>>>>>
>>>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
>>>>>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol
>>>>>>>> is:
>>>>>>>>
>>>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge
>>>>>>>> value
>>>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>>>>
>>>>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>>>>
>>>>>>>> Next steps are:
>>>>>>>> - OutputFormat
>>>>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>>>>> - Bigger graphs :)
>>>>>>>>
>>>>>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Martin
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>>>>> [2]
>>>>>>>>
>>>>>>>>
>>>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>>>>
>>>>>>
>>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>>>>
>>>>>>>> Hello everyone,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing
>>>>>>>>> about
>>>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>>>>> JIRA
>>>>>>>>> (FLINK-2941) containing an initial document [1], but we'd also
>>>>>>>>> like to
>>>>>>>>> share our ideas here to engage the community and get your feedback.
>>>>>>>>>
>>>>>>>>> We've had a skype call today and I will try to summarize some of
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>> key
>>>>>>>>
>>>>>>>
>>>>>> points here. The main use-cases we see are the following:
>>>>>>>
>>>>>>>>
>>>>>>>>> - Use Flink for ETL / creating the graph and then insert it to a
>>>>>>>>> graph
>>>>>>>>> database, like neo4j, for querying and search.
>>>>>>>>> - Query neo4j on some topic or search the graph for patterns and
>>>>>>>>>
>>>>>>>>> extract a
>>>>>>>>
>>>>>>>
>>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>>>
>>>>>>>> analysis
>>>>>>>>> task. This is where Flink/Gelly can help, by complementing the
>>>>>>>>> querying
>>>>>>>>> (neo4j) with efficient iterative computation.
>>>>>>>>>
>>>>>>>>> We all agreed that the main challenge is efficiently getting the
>>>>>>>>> data
>>>>>>>>>
>>>>>>>>> out
>>>>>>>>
>>>>>>>
>>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>>>
>>>>>>>> things
>>>>>>>>> with neo4j and Spark, but the performance results are not very
>>>>>>>>>
>>>>>>>>> promising:
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>>>>> it's
>>>>>>>>>
>>>>>>>>> not
>>>>>>>>
>>>>>>>
>>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>>>
>>>>>>>> database to
>>>>>>>>> HDFS and then reading it back to Flink would probably be terribly
>>>>>>>>> slow.
>>>>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP
>>>>>>>>> interface to import data into Spark, run PageRank and then put data
>>>>>>>>>
>>>>>>>>> back
>>>>>>>>
>>>>>>>
>>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>>>>>>>
>>>>>>>>
>>>>>>>>> main
>>>>>>>>
>>>>>>>
>>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>>>
>>>>>>>> had to
>>>>>>>>>
>>>>>>>>> be
>>>>>>>>
>>>>>>>
>>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>>>
>>>>>>>> computation itself, which seems weird to me.
>>>>>>>>>
>>>>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report
>>>>>>>>>
>>>>>>>>> back
>>>>>>>>
>>>>>>>
>>>>>> when we have some results.
>>>>>>>
>>>>>>>>
>>>>>>>>> In the meantime, if you have any ideas on how we could speed up
>>>>>>>>> reading
>>>>>>>>> from neo4j or any suggestion on approaches that I haven't
>>>>>>>>> mentioned,
>>>>>>>>> please
>>>>>>>>> feel free to reply to this e-mail or add your comment in the shared
>>>>>>>>> document.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Vasia.
>>>>>>>>>
>>>>>>>>> [1]:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>>>
>>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>>
>>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: neo4j - Flink connector

Robert Metzger
I've had a private conversation with Martin to further work on this issue.

The problem is that "flink-neo4j" is using jersey-client, version 1.19
(Neo4j is also using that version). Our flink-shaded-hadoop2 jar contains a
jersey-client version 1.9, which appears first in the classpath, to the JVM
loads the wrong jersey version.
Maven can not handle the version conflict bc. it does not know that
flink-shaded-hadoop2 contains jersey (its a dependency reduced pom).

For now, I'm suggesting to relocate the jersey-client.
Also, another user contacted me with a similar issue, in that case, its the
Apache httpcomponents which are included in our flink-shaded-hadoop2 jar.



On Fri, Nov 13, 2015 at 1:31 PM, Robert Metzger <[hidden email]> wrote:

> Have a look at my updated version of your code:
> https://github.com/rmetzger/scratch/tree/dependency_problem
> It now executes both tests, however, I was not able to get the second test
> to pass. It seems that Neo4j's web server is returning a 500 status code
> when open()'ing the connection.
> I'm not sure how to debug this issue.
>
> On Thu, Nov 12, 2015 at 2:19 PM, Martin Junghanns <[hidden email]
> > wrote:
>
>> Hi Robert,
>>
>> Thank you for the reply! At the moment we just "play" with Neo4j and
>> Flink but the InputFormat shall be available in Flink eventually.
>>
>> Concerning the license: I did not think of that, but yes, I can make it
>> available in maven central. I just need to find out how to do this.
>>
>> I created a branch that includes the dependency problem [1]. There is a
>> test case "neo4jOnly" [2] which does not use Flink and works fine in a
>> project where only neo4j-harness is included. However, when I add
>> flink-java and flink-gelly (excluding flink-clients because of jetty) to
>> the project, the neo4jOnly test fails with:
>>
>> org.neo4j.server.ServerStartupException: Starting Neo4j failed:
>> com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction;
>>
>> I compared the depedencies of the "clean" neo4j-harness project and made
>> sure the dependencies and versions are the same. ReflectionHelper is part
>> of jersey-core which is included.
>>
>> This is really weird, because - as I wrote before - the simple neo4jOnly
>> test ran a few days ago. Were there any changes concerning dependencies in
>> 0.10-SNAPSHOT?
>> However, the next thing which would fail is caused by the scala-library
>> version conflict.
>>
>> Again, thanks for your help.
>>
>> Best,
>> Martin
>>
>> [1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem
>> [2]
>> https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32
>>
>>
>> On 12.11.2015 12:51, Robert Metzger wrote:
>>
>>> Sorry for the delay.
>>> So the plan of this work is to add a neo4j connector into Flink, right?
>>>
>>> While looking at the pom files of neo4j I found that its GPLv3 licensed,
>>> and Apache projects can not depend/link with GPL code [1].
>>> So I we can not make the module part of the Flink source.
>>> However, its actually quite easy to publish code into Maven central, so
>>> you
>>> could release it on your own into Maven.
>>> If that is too much work for you, I can also start a github project like
>>> "flink-gpl" with access to maven central where we can release stuff like
>>> this.
>>>
>>> Is this repository [2] your current work in progress on the dependency
>>> issue?
>>> Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
>>> build out. In this case, we could require Flink users to use the scala
>>> 2.11
>>> build of Flink when they want to use neo4j.
>>> I think I can help you much better as soon as I have your current pom
>>> file
>>> + code.
>>>
>>> [1] http://www.apache.org/legal/resolved.html#category-a
>>> [2] https://github.com/s1ck/flink-neo4j
>>>
>>>
>>> On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <
>>> [hidden email]>
>>> wrote:
>>>
>>> Hi,
>>>>
>>>> I am a bit stuck with that dependency problem. Any help would be
>>>> appreciated as I would like to continue working on the formats. Thanks!
>>>>
>>>> Best,
>>>> Martin
>>>>
>>>>
>>>> On 07.11.2015 17:28, Martin Junghanns wrote:
>>>>
>>>> Hi Robert,
>>>>>
>>>>> Thank you for the hints. I tried to narrow down the error:
>>>>>
>>>>> Flink version: 0.10-SNAPSHOT
>>>>> Neo4j version: 2.3.0
>>>>>
>>>>> I start with two dependencies:
>>>>> flink-java
>>>>> flink-gelly
>>>>>
>>>>> (1) Add neo4j-harness and run basic example from Neo4j [1]
>>>>> Leads to:
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> org.eclipse.jetty.server.ConnectionFactory
>>>>>
>>>>> (2) I excluded jetty-server from flink-java and flink-gelly
>>>>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
>>>>> Leads to:
>>>>>
>>>>> leads to: java.lang.NoSuchMethodError:
>>>>> org.eclipse.jetty.servlet.ServletContextHandler.<init>
>>>>>
>>>>> (3) I excluded jetty-servlet from flink-java and flink-gelly
>>>>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
>>>>> Leads to:
>>>>>
>>>>> java.lang.NoSuchMethodError: scala.Predef$.$conforms()
>>>>>
>>>>> (4) I excluded scala-library from flink-java and flink-gelly
>>>>> It now uses scala-library:2.11.7 (was 2.10.4)
>>>>>
>>>>> Now, the basic Neo4j example (without Flink runs).
>>>>>
>>>>> Next, I added Flink to the mix and wrote a simple test using
>>>>> neo4j-harness features, ExecutionEnvironment and my InputFormat.
>>>>> Leads to:
>>>>>
>>>>> java.lang.NoSuchMethodError:
>>>>>
>>>>>
>>>>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
>>>>>
>>>>>       at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
>>>>>       at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
>>>>>       at akka.actor.RootActorPath.$div(ActorPath.scala:159)
>>>>>       at
>>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
>>>>>       at
>>>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
>>>>>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method)
>>>>>       at
>>>>>
>>>>>
>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>
>>>>>       at
>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>>>       at
>>>>>
>>>>>
>>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>>>>>
>>>>>       at scala.util.Try$.apply(Try.scala:192)
>>>>>       at
>>>>>
>>>>>
>>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
>>>>>
>>>>>       at scala.util.Success.flatMap(Try.scala:231)
>>>>>       at
>>>>>
>>>>>
>>>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
>>>>>
>>>>>       at
>>>>> akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
>>>>>       at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
>>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
>>>>>       at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
>>>>>       at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
>>>>>
>>>>>       at
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
>>>>>
>>>>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>>>>>
>>>>>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>>>>>       at
>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>>>>>       at
>>>>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
>>>>>
>>>>>       at
>>>>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
>>>>>       at
>>>>>
>>>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
>>>>>
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
>>>>>
>>>>>       at
>>>>> org.apache.flink.api.java.io
>>>>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
>>>>>
>>>>>
>>>>> This is where I don't know what to exclude next. Seems that some
>>>>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on
>>>>> scala
>>>>> 2.11.7.
>>>>>
>>>>> How can I make use of the maven shade plugin in that case?
>>>>>
>>>>> Again, thank you!
>>>>>
>>>>> Cheers,
>>>>> Martin
>>>>>
>>>>> [1]
>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>> (testMyExtensionWithFunctionFixture())
>>>>>
>>>>>
>>>>> On 06.11.2015 16:17, Robert Metzger wrote:
>>>>>
>>>>> Hi Martin,
>>>>>>
>>>>>> what exactly were the issues you were facing with the dependency
>>>>>> conflicts?
>>>>>>
>>>>>> There is a way around these issues, and its called shading:
>>>>>> https://maven.apache.org/plugins/maven-shade-plugin/
>>>>>> In Flink we have several shaded modules (curator, hadoop) .. we could
>>>>>> add a
>>>>>> neo4j-harness-shaded module which relocates conflicting dependencies
>>>>>> into a
>>>>>> different namespace. That way, you can execute different versions of
>>>>>> the
>>>>>> same library (jetty, scala) at the same time.
>>>>>> Since the module contains module would only contain dependencies
>>>>>> needed at
>>>>>> test time, we could exclude it from releases.
>>>>>>
>>>>>> Regarding Scala, it would be fine to execute the neo4j tests only with
>>>>>> scala 2.11 builds. Its not hard to control this using maven build
>>>>>> profiles.
>>>>>> Do you really need jetty? Is neo4j starting the web interface also for
>>>>>> the
>>>>>> tests?
>>>>>>
>>>>>> Regards,
>>>>>> Robert
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>>
>>>>>>> I could need your input on testing the input format with Flink.
>>>>>>>
>>>>>>> As I already mentioned, Neo4j offers a dedicated module
>>>>>>> (neo4j-harness)
>>>>>>> for unit testing server extensions / REST applications. The problem
>>>>>>> here
>>>>>>> is that the dependencies of Flink conflict with the dependencies of
>>>>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
>>>>>>> combination could run using the maven exclude mechanism, but no
>>>>>>> success.
>>>>>>>
>>>>>>> So I thought about workarounds:
>>>>>>>
>>>>>>> (1) Michael Hunger (neo4j) started a project and invited me to
>>>>>>> contribute [1]. What it does during tests is:
>>>>>>> - download a neo4j-<version>.tar.gz into a temp folder
>>>>>>> - extract and start a neo4j instance
>>>>>>> - run tests
>>>>>>> - stop and discard neo4j
>>>>>>>
>>>>>>> I like the concept, but I guess the problem is that it runs outside
>>>>>>> of
>>>>>>> maven and I guess downloading from external resources (especially in
>>>>>>> travis-ci) could lead to problems.
>>>>>>>
>>>>>>> (2) I had a look into the other input formats. flink-hbase uses
>>>>>>> examples
>>>>>>> instead of unit tests. This could be an option as long as there is no
>>>>>>> clean solution for "real" unit testing.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Cheers, Martin
>>>>>>>
>>>>>>>
>>>>>>> [1] https://github.com/jexp/neo4j-starter
>>>>>>>
>>>>>>>
>>>>>>> On 03.11.2015 01:18, Stephan Ewen wrote:
>>>>>>>
>>>>>>> Wow, very nice results :-)
>>>>>>>>
>>>>>>>> This input format alone is probably a very useful contribution, so I
>>>>>>>>
>>>>>>>> would
>>>>>>>
>>>>>>> open a contribution there once you manage to get a few tests running.
>>>>>>>>
>>>>>>>> I know little about neo4j, is there a way to read cypher query
>>>>>>>> results in
>>>>>>>> parallel? (most systems do not expose such an interface, but maybe
>>>>>>>> neo4j
>>>>>>>>
>>>>>>>> is
>>>>>>>
>>>>>>> special there).
>>>>>>>>
>>>>>>>> I recall at some point in time Martin Neumann asking about a way to
>>>>>>>>
>>>>>>>> create
>>>>>>>
>>>>>>> dense contiguous unique IDs for creating graphs that can be
>>>>>>>> bulk-imported
>>>>>>>> into neo4j. There is code for that in the data set utils, this may
>>>>>>>> be
>>>>>>>> valuable for an output format.
>>>>>>>>
>>>>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
>>>>>>>>
>>>>>>>> [hidden email]>
>>>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I wanted to give you a little update. I created a non-parallel
>>>>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1].
>>>>>>>>> It can be used like the JDBCInputFormat:
>>>>>>>>>
>>>>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
>>>>>>>>>
>>>>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
>>>>>>>>> Neo4jInputFormat.buildNeo4jInputFormat()
>>>>>>>>>         .setRestURI(restURI)
>>>>>>>>>         .setCypherQuery(q)
>>>>>>>>>         .setUsername("neo4j")
>>>>>>>>>         .setPassword("test")
>>>>>>>>>         .setConnectTimeout(1000)
>>>>>>>>>         .setReadTimeout(1000)
>>>>>>>>>         .finish();
>>>>>>>>>
>>>>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running.
>>>>>>>>> I tried to get neo4j-harness [2] into the project, but there are
>>>>>>>>> some
>>>>>>>>> dependency conflicts which I need to figure out.
>>>>>>>>>
>>>>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with
>>>>>>>>> Neo4j
>>>>>>>>> running on the same machine. My dataset is the polish wiki dump [2]
>>>>>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol
>>>>>>>>> is:
>>>>>>>>>
>>>>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
>>>>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge
>>>>>>>>> value
>>>>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0
>>>>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations
>>>>>>>>>
>>>>>>>>> This takes about 22 seconds on my machine which is very promising.
>>>>>>>>>
>>>>>>>>> Next steps are:
>>>>>>>>> - OutputFormat
>>>>>>>>> - Better Unit tests (integrate neo4j-harness)
>>>>>>>>> - Bigger graphs :)
>>>>>>>>>
>>>>>>>>> Any ideas and suggestions are of course highly appreciated :)
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Martin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] https://github.com/s1ck/flink-neo4j
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>>>
>>>>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
>>>>>>>>
>>>>>>>
>>>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote:
>>>>>>>>>
>>>>>>>>> Hello everyone,
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing
>>>>>>>>>> about
>>>>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding
>>>>>>>>>> JIRA
>>>>>>>>>> (FLINK-2941) containing an initial document [1], but we'd also
>>>>>>>>>> like to
>>>>>>>>>> share our ideas here to engage the community and get your
>>>>>>>>>> feedback.
>>>>>>>>>>
>>>>>>>>>> We've had a skype call today and I will try to summarize some of
>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> key
>>>>>>>>>
>>>>>>>>
>>>>>>> points here. The main use-cases we see are the following:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> - Use Flink for ETL / creating the graph and then insert it to a
>>>>>>>>>> graph
>>>>>>>>>> database, like neo4j, for querying and search.
>>>>>>>>>> - Query neo4j on some topic or search the graph for patterns and
>>>>>>>>>>
>>>>>>>>>> extract a
>>>>>>>>>
>>>>>>>>
>>>>>>> subgraph, on which we'd then like to run some iterative graph
>>>>>>>>
>>>>>>>>> analysis
>>>>>>>>>> task. This is where Flink/Gelly can help, by complementing the
>>>>>>>>>> querying
>>>>>>>>>> (neo4j) with efficient iterative computation.
>>>>>>>>>>
>>>>>>>>>> We all agreed that the main challenge is efficiently getting the
>>>>>>>>>> data
>>>>>>>>>>
>>>>>>>>>> out
>>>>>>>>>
>>>>>>>>
>>>>>>> of neo4j and into Flink. There have been some attempts to do similar
>>>>>>>>
>>>>>>>>> things
>>>>>>>>>> with neo4j and Spark, but the performance results are not very
>>>>>>>>>>
>>>>>>>>>> promising:
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's
>>>>>>>>>> it's
>>>>>>>>>>
>>>>>>>>>> not
>>>>>>>>>
>>>>>>>>
>>>>>>> worth it going towards this direction, as dumping the neo4j
>>>>>>>>
>>>>>>>>> database to
>>>>>>>>>> HDFS and then reading it back to Flink would probably be terribly
>>>>>>>>>> slow.
>>>>>>>>>> - In [3], you can see Michael Hunger's findings on using neo's
>>>>>>>>>> HTTP
>>>>>>>>>> interface to import data into Spark, run PageRank and then put
>>>>>>>>>> data
>>>>>>>>>>
>>>>>>>>>> back
>>>>>>>>>
>>>>>>>>
>>>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> main
>>>>>>>>>
>>>>>>>>
>>>>>>> bottlenecks appear to be (1) reading the data as an RDD => this
>>>>>>>>
>>>>>>>>> had to
>>>>>>>>>>
>>>>>>>>>> be
>>>>>>>>>
>>>>>>>>
>>>>>>> performed into small batches to avoid OOM errors and (2) PageRank
>>>>>>>>
>>>>>>>>> computation itself, which seems weird to me.
>>>>>>>>>>
>>>>>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll
>>>>>>>>>> report
>>>>>>>>>>
>>>>>>>>>> back
>>>>>>>>>
>>>>>>>>
>>>>>>> when we have some results.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> In the meantime, if you have any ideas on how we could speed up
>>>>>>>>>> reading
>>>>>>>>>> from neo4j or any suggestion on approaches that I haven't
>>>>>>>>>> mentioned,
>>>>>>>>>> please
>>>>>>>>>> feel free to reply to this e-mail or add your comment in the
>>>>>>>>>> shared
>>>>>>>>>> document.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> -Vasia.
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
>>>>>>>
>>>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner
>>>>>>>
>>>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>